Version 2.4.1
Asynchronous operations

WiredTiger supports asynchronous operations; as an example of where this can be useful, a server application handling requests from a network as fast as possible may want its worker threads to initiate a unit of work and then immediately respond to the next request, rather than waiting for the results of the first request.

WiredTiger supports asynchronous operations through the WT_ASYNC_OP handle. The work unit represented by the WT_ASYNC_OP handle is queued by the application and performed by an internal WiredTiger worker thread. When the work unit completes, the WiredTiger thread makes a callback to notify the application the operation is finished, along with providing any results and error values.

The asynchronous operation handle operates in a manner similar to a WT_CURSOR handle. An asynchronous operation includes:

  • getter/setters for key and value fields
  • encoding of fields to store in the data source
  • methods to modify and retrieve specific data (for example, insert and update)
  • a method to compact a table

The WT_ASYNC_OP handle does not survive after the callback function returns into the WiredTiger library. When the application callback returns the handle is returned to the system pool. The application callback must copy out any key, value or other information that it needs before the callback function returns.

Configuring asynchronous operations

To perform asynchronous operations, the application must first include the async configuration option when wiredtiger_open is called. Additional configuration parameters include the number of WiredTiger worker threads created to handle the incoming queue of operations and the maximum number of simultaneous asynchronous operations that are expected.

For example, the following configures an application for asynchronous operations, with a maximum of 10 asynchronous operations and 2 supporting threads:

ret = wiredtiger_open(home, NULL,
"create,cache_size=100MB,"
"async=(enabled=true,ops_max=20,threads=2)", &conn);

If the number of requests exceeds the configured maximum number, a WT_ASYNC_OP handle won't immediately be available and an error will be returned to the application when it attempts to allocate a handle. If the number of configured worker threads are unable to keep up with the requests, requests will be forced to wait for worker threads to become available.

Allocating an asynchronous operations handle

A WT_ASYNC_OP handle is allocated using the WT_CONNECTION::async_new_op method. This method takes an existing object URI and a callback. For example:

while ((ret = conn->async_new_op(conn,
"table:async", NULL, &ex_asynckeys.iface, &op)) != 0) {
/*
* If we used up all the handles, pause and retry to
* give the workers a chance to catch up.
*/
fprintf(stderr,
"asynchronous operation handle not available\n");
if (ret == EBUSY)
sleep(1);
else
return (ret);
}

To aid the application in matching up an asynchronous operation with a subsequent call to the callback function, every handle contains a unique uint64_t identifier and WT_ASYNC_OPTYPE type. The identifier is assigned when the handle is allocated and the type is assigned when the asynchronous operation is queued.

To retrieve the id, use the WT_ASYNC_OP::get_id method:

/* Retrieve the operation's 64-bit identifier. */
id = op->get_id(op);

To retrieve the WT_ASYNC_OPTYPE type, use the WT_ASYNC_OP::get_type method:

/* Retrieve the operation's WT_ASYNC_OPTYPE type. */
type = op->get_type(op);

WiredTiger only allows a limited number of method calls back into the library using the WT_ASYNC_OP handle, while in the callback function. The application is allowed to retrieve than handle's key, value, identifier and the operation type from the WT_ASYNC_OP handle.

Here is a complete example callback function implementation, from the example program ex_async.c:

typedef struct {
uint32_t num_keys;
} ASYNC_KEYS;
static int
async_callback(WT_ASYNC_CALLBACK *cb,
WT_ASYNC_OP *op, int wiredtiger_error, uint32_t flags)
{
ASYNC_KEYS *asynckey = (ASYNC_KEYS *)cb;
WT_ITEM k, v;
const char *key, *value;
uint64_t id;
int ret;
(void)flags; /* Unused */
ret = 0;
/* Retrieve the operation's WT_ASYNC_OPTYPE type. */
type = op->get_type(op);
/* Retrieve the operation's 64-bit identifier. */
id = op->get_id(op);
/* Check for a WiredTiger error. */
if (wiredtiger_error != 0) {
fprintf(stderr,
"ID %" PRIu64 " error %d: %s\n",
id, wiredtiger_error,
wiredtiger_strerror(wiredtiger_error));
global_error = wiredtiger_error;
return (1);
}
/* If doing a search, retrieve the key/value pair. */
if (type == WT_AOP_SEARCH) {
ret = op->get_key(op, &k);
key = k.data;
ret = op->get_value(op, &v);
value = v.data;
ATOMIC_ADD(asynckey->num_keys, 1);
printf("Id %" PRIu64 " got record: %s : %s\n", id, key, value);
}
return (ret);
}

Executing asynchronous operations

The WT_ASYNC_OP handle behaves similarly to the WT_CURSOR handle, that is, the key and value are initialized and then an operation is performed.

For example, the following code does an asynchronous insert into the table:

/*
* Set the operation's string key and value, and then do
* an asynchronous insert.
*/
snprintf(k[i], sizeof(k), "key%d", i);
op->set_key(op, k[i]);
snprintf(v[i], sizeof(v), "value%d", i);
op->set_value(op, v[i]);
ret = op->insert(op);

For example, the following code does an asynchronous search of the table:

/*
* Set the operation's string key and value, and then do
* an asynchronous search.
*/
snprintf(k[i], sizeof(k), "key%d", i);
op->set_key(op, k[i]);
ret = op->search(op);

When a database contains multiple tables, it may be desired to compact several tables in parallel without having to manage separate threads to each call WT_SESSION::compact. Alternatively, compacting several tables serially may take much longer. The WT_ASYNC_OP::compact method allows the application to compact multiple objects asynchronously.

/*
* Compact a table asynchronously, limiting the run-time to 5 minutes.
*/
ret = conn->async_new_op(
conn, "table:async", "timeout=300", &ex_asynckeys.iface, &op);
ret = op->compact(op);

Waiting for outstanding operations to complete

The WT_CONNECTION::async_flush method can be used to wait for all previous operations to complete. When that call returns, all previously queued operations are guaranteed to have been completed and their callback functions have returned.

/* Wait for all outstanding operations to complete. */
ret = conn->async_flush(conn);

Because WT_CONNECTION::close implicitly does a WT_CONNECTION::async_flush, the call is not required in all applications.

Asynchronous operations and transactions

Each asynchronous worker thread operates in its own session, executing a single asynchronous operation with the context of the session's transaction. Therefore, there is no way to combine multiple, related updates into a single transaction when using asynchronous operations.

The transaction is committed if the operation was successful and the application callback returns success, otherwise the transaction is rolled back.