Version 2.5.0
Asynchronous operations in Java

WiredTiger supports asynchronous operations; as an example of where this can be useful, a server application handling requests from a network as fast as possible may want its worker threads to initiate a unit of work and then immediately respond to the next request, rather than waiting for the results of the first request.

WiredTiger supports asynchronous operations through the AsyncOp handle. The work unit represented by the AsyncOp handle is queued by the application and performed by an internal WiredTiger worker thread. When the work unit completes, the WiredTiger thread makes a callback to notify the application the operation is finished, along with providing any results and error values.

The asynchronous operation handle operates in a manner similar to a Cursor handle. An asynchronous operation includes:

  • getter/setters for key and value fields
  • encoding of fields to store in the data source
  • methods to modify and retrieve specific data (for example, insert and update)
  • a method to compact a table

The AsyncOp handle does not survive after the callback function returns into the WiredTiger library. When the application callback returns the handle is returned to the system pool. The application callback must copy out any key, value or other information that it needs before the callback function returns.

Configuring asynchronous operations

To perform asynchronous operations, the application must first include the async configuration option when wiredtiger.open is called. Additional configuration parameters include the number of WiredTiger worker threads created to handle the incoming queue of operations and the maximum number of simultaneous asynchronous operations that are expected.

For example, the following configures an application for asynchronous operations, with a maximum of 10 asynchronous operations and 2 supporting threads:

conn = wiredtiger.open(home, "create,cache_size=100MB," +
"async=(enabled=true,ops_max=20,threads=2)");

If the number of requests exceeds the configured maximum number, a AsyncOp handle won't immediately be available and an error will be returned to the application when it attempts to allocate a handle. If the number of configured worker threads are unable to keep up with the requests, requests will be forced to wait for worker threads to become available.

Allocating an asynchronous operations handle

A AsyncOp handle is allocated using the Connection.async_new_op method. This method takes an existing object URI and a callback. For example:

op = tryAsyncNewOp(conn, "table:async", null, asynciface);

To aid the application in matching up an asynchronous operation with a subsequent call to the callback function, every handle contains a unique uint64_t identifier and AsyncOpType type. The identifier is assigned when the handle is allocated and the type is assigned when the asynchronous operation is queued.

To retrieve the id, use the AsyncOp.get_id method:

/* Retrieve the operation's 64-bit identifier. */
long id = op.getId();

To retrieve the AsyncOpType type, use the AsyncOp.get_type method:

/* Retrieve the operation's type. */
AsyncOpType optype = op.getType();

WiredTiger only allows a limited number of method calls back into the library using the AsyncOp handle, while in the callback function. The application is allowed to retrieve than handle's key, value, identifier and the operation type from the AsyncOp handle.

Here is a complete example callback function implementation, from the example program ex_async.java:

class AsyncKeys implements AsyncCallback {
public int numKeys = 0;
public AsyncKeys() {}
public void notifyError(String desc) {
System.err.println("ERROR: notify: " + desc);
}
public int notify(AsyncOp op, int opReturn, int flags) {
/*
* Note: we are careful not to throw any errors here. Any
* exceptions would be swallowed by a native worker thread.
*/
int ret = 0;
try {
/* Retrieve the operation's type. */
AsyncOpType optype = op.getType();
/* Retrieve the operation's 64-bit identifier. */
long id = op.getId();
if (optype == AsyncOpType.WT_AOP_SEARCH) {
String key = op.getKeyString();
String value = op.getValueString();
synchronized (this) {
numKeys += 1;
}
System.out.println("Id " + id + " got record: " + key +
" : " + value);
}
else {
notifyError("unexpected optype");
ret = 1;
}
}
catch (Exception e) {
System.err.println("ERROR: exception in notify: " + e.toString() +
", opreturn=" + opReturn);
ret = 1;
}
return (ret);
}
}
AsyncKeys asynciface = new AsyncKeys();

Executing asynchronous operations

The AsyncOp handle behaves similarly to the Cursor handle, that is, the key and value are initialized and then an operation is performed.

For example, the following code does an asynchronous insert into the table:

/*
* Set the operation's string key and value, and then do
* an asynchronous insert.
*/
k[i] = "key" + i;
op.putKeyString(k[i]);
v[i] = "value" + i;
op.putValueString(v[i]);
ret = op.insert();

For example, the following code does an asynchronous search of the table:

/*
* Set the operation's string key and value, and then do
* an asynchronous search.
*/
k[i] = "key" + i;
op.putKeyString(k[i]);
ret = op.search();

When a database contains multiple tables, it may be desired to compact several tables in parallel without having to manage separate threads to each call Session.compact. Alternatively, compacting several tables serially may take much longer. The AsyncOp.compact method allows the application to compact multiple objects asynchronously.

/*
* Compact a table asynchronously, limiting the run-time to 5 minutes.
*/
op = tryAsyncNewOp(conn, "table:async", "timeout=300", asynciface);
ret = op.compact();

Waiting for outstanding operations to complete

The Connection.async_flush method can be used to wait for all previous operations to complete. When that call returns, all previously queued operations are guaranteed to have been completed and their callback functions have returned.

/* Wait for all outstanding operations to complete. */
ret = conn.async_flush();

Because Connection.close implicitly does a Connection.async_flush, the call is not required in all applications.

Asynchronous operations and transactions

Each asynchronous worker thread operates in its own session, executing a single asynchronous operation with the context of the session's transaction. Therefore, there is no way to combine multiple, related updates into a single transaction when using asynchronous operations.

The transaction is committed if the operation was successful and the application callback returns success, otherwise the transaction is rolled back.