Version 3.3.0
ex_async.c

Show how to configure and use asynchronous operations.

/*-
* Public Domain 2014-present MongoDB, Inc.
* Public Domain 2008-2014 WiredTiger, Inc.
*
* This is free and unencumbered software released into the public domain.
*
* Anyone is free to copy, modify, publish, use, compile, sell, or
* distribute this software, either in source code form or as a compiled
* binary, for any purpose, commercial or non-commercial, and by any
* means.
*
* In jurisdictions that recognize copyright laws, the author or authors
* of this software dedicate any and all copyright interest in the
* software to the public domain. We make this dedication for the benefit
* of the public at large and to the detriment of our heirs and
* successors. We intend this dedication to be an overt act of
* relinquishment in perpetuity of all present and future rights to this
* software under copyright law.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*
* ex_async.c
* demonstrates how to use the asynchronous API.
*/
#include <test_util.h>
static const char *home;
#if defined(_lint)
#define ATOMIC_ADD(v, val) ((v) += (val), (v))
#elif defined(_WIN32)
#define ATOMIC_ADD(v, val) (_InterlockedExchangeAdd(&(v), val) + val)
#else
#define ATOMIC_ADD(v, val) __atomic_add_fetch(&(v), val, __ATOMIC_SEQ_CST)
#endif
static int global_error = 0;
typedef struct {
uint32_t num_keys;
} ASYNC_KEYS;
static int
async_callback(WT_ASYNC_CALLBACK *cb, WT_ASYNC_OP *op, int wiredtiger_error, uint32_t flags)
{
ASYNC_KEYS *asynckey = (ASYNC_KEYS *)cb;
WT_ITEM k, v;
const char *key, *value;
uint64_t id;
(void)flags; /* Unused */
/* Retrieve the operation's WT_ASYNC_OPTYPE type. */
type = op->get_type(op);
/* Retrieve the operation's 64-bit identifier. */
id = op->get_id(op);
/* Check for a WiredTiger error. */
if (wiredtiger_error != 0) {
fprintf(stderr, "ID %" PRIu64 " error %d: %s\n", id, wiredtiger_error,
wiredtiger_strerror(wiredtiger_error));
global_error = wiredtiger_error;
return (1);
}
/* If doing a search, retrieve the key/value pair. */
if (type == WT_AOP_SEARCH) {
error_check(op->get_key(op, &k));
key = k.data;
error_check(op->get_value(op, &v));
value = v.data;
ATOMIC_ADD(asynckey->num_keys, 1);
printf("Id %" PRIu64 " got record: %s : %s\n", id, key, value);
}
return (0);
}
static ASYNC_KEYS ex_asynckeys = {{async_callback}, 0};
#define MAX_KEYS 15
int
main(int argc, char *argv[])
{
WT_SESSION *session;
int i, ret;
char k[MAX_KEYS][16], v[MAX_KEYS][16];
home = example_setup(argc, argv);
error_check(wiredtiger_open(
home, NULL, "create,cache_size=100MB,async=(enabled=true,ops_max=20,threads=2)", &conn));
error_check(conn->open_session(conn, NULL, NULL, &session));
error_check(session->create(session, "table:async", "key_format=S,value_format=S"));
/* Insert a set of keys asynchronously. */
for (i = 0; i < MAX_KEYS; i++) {
while (
(ret = conn->async_new_op(conn, "table:async", NULL, &ex_asynckeys.iface, &op)) != 0) {
/*
* If we used up all the handles, pause and retry to give the workers a chance to catch
* up.
*/
fprintf(stderr, "asynchronous operation handle not available\n");
if (ret == EBUSY)
sleep(1);
else
return (EXIT_FAILURE);
}
/*
* Set the operation's string key and value, and then do an asynchronous insert.
*/
(void)snprintf(k[i], sizeof(k), "key%d", i);
op->set_key(op, k[i]);
(void)snprintf(v[i], sizeof(v), "value%d", i);
op->set_value(op, v[i]);
error_check(op->insert(op));
}
/* Wait for all outstanding operations to complete. */
error_check(conn->async_flush(conn));
/*
* Compact a table asynchronously, limiting the run-time to 5 minutes.
*/
error_check(conn->async_new_op(conn, "table:async", "timeout=300", &ex_asynckeys.iface, &op));
error_check(op->compact(op));
/* Search for the keys we just inserted, asynchronously. */
for (i = 0; i < MAX_KEYS; i++) {
while (
(ret = conn->async_new_op(conn, "table:async", NULL, &ex_asynckeys.iface, &op)) != 0) {
/*
* If we used up all the handles, pause and retry to give the workers a chance to catch
* up.
*/
fprintf(stderr, "asynchronous operation handle not available\n");
if (ret == EBUSY)
sleep(1);
else
return (EXIT_FAILURE);
}
/*
* Set the operation's string key and value, and then do an asynchronous search.
*/
(void)snprintf(k[i], sizeof(k), "key%d", i);
op->set_key(op, k[i]);
error_check(op->search(op));
}
/*
* Connection close automatically does an async_flush so it will wait for all queued search
* operations to complete.
*/
error_check(conn->close(conn, NULL));
printf("Searched for %" PRIu32 " keys\n", ex_asynckeys.num_keys);
return (EXIT_SUCCESS);
}
WT_ASYNC_OP::get_value
int get_value(WT_ASYNC_OP *op,...)
Invoke the underlying WT_CURSOR::get_value method; see that method for configuration,...
WT_ASYNC_OP::get_key
int get_key(WT_ASYNC_OP *op,...)
Invoke the underlying WT_CURSOR::get_key method; see that method for configuration,...
WT_SESSION::create
int create(WT_SESSION *session, const char *name, const char *config)
Create a table, column group, index or file.
WT_CONNECTION::async_flush
int async_flush(WT_CONNECTION *connection)
Wait for all outstanding operations to complete.
WT_AOP_SEARCH
@ WT_AOP_SEARCH
WT_ASYNC_OP::search.
Definition: wiredtiger.in:736
WT_CONNECTION::async_new_op
int async_new_op(WT_CONNECTION *connection, const char *uri, const char *config, WT_ASYNC_CALLBACK *callback, WT_ASYNC_OP **asyncopp)
Return an async operation handle.
WT_ITEM::data
const void * data
The memory reference of the data item.
Definition: wiredtiger.in:111
wiredtiger_strerror
const char * wiredtiger_strerror(int error)
Return information about a WiredTiger error as a string (see WT_SESSION::strerror for a thread-safe A...
WT_CONNECTION::open_session
int open_session(WT_CONNECTION *connection, WT_EVENT_HANDLER *event_handler, const char *config, WT_SESSION **sessionp)
Open a session.
WT_ASYNC_OP
A WT_ASYNC_OP handle is the interface to an asynchronous operation.
Definition: wiredtiger.in:761
WT_ITEM
A raw item of data to be managed, including a pointer to the data and a length.
Definition: wiredtiger.in:103
WT_ASYNC_OP::insert
int insert(WT_ASYNC_OP *op)
Invoke the underlying WT_CURSOR::insert method; see that method for configuration,...
WT_ASYNC_OP::set_key
void set_key(WT_ASYNC_OP *op,...)
Invoke the underlying WT_CURSOR::set_key method; see that method for configuration,...
WT_CONNECTION
A connection to a WiredTiger database.
Definition: wiredtiger.in:2069
WT_ASYNC_OP::set_value
void set_value(WT_ASYNC_OP *op,...)
Invoke the underlying WT_CURSOR::set_value method; see that method for configuration,...
WT_CONNECTION::close
int close(WT_HANDLE_CLOSED(WT_CONNECTION) *connection, const char *config)
Close a connection.
WT_ASYNC_OP::get_type
WT_ASYNC_OPTYPE get_type(WT_ASYNC_OP *op)
Get the type for this operation.
WT_ASYNC_OP::compact
int compact(WT_ASYNC_OP *op)
Invoke the underlying WT_SESSION::compact method; see that method for configuration,...
WT_ASYNC_OP::search
int search(WT_ASYNC_OP *op)
Invoke the underlying WT_CURSOR::search method; see that method for configuration,...
wiredtiger_open
int wiredtiger_open(const char *home, WT_EVENT_HANDLER *event_handler, const char *config, WT_CONNECTION **connectionp)
Open a connection to a database.
WT_ASYNC_OPTYPE
WT_ASYNC_OPTYPE
Asynchronous operation types.
Definition: wiredtiger.in:731
WT_SESSION
All data operations are performed in the context of a WT_SESSION.
Definition: wiredtiger.in:930
WT_ASYNC_OP::get_id
uint64_t get_id(WT_ASYNC_OP *op)
Get the unique identifier for this operation.
WT_ASYNC_CALLBACK
The interface implemented by applications to accept notifications of the completion of asynchronous o...
Definition: wiredtiger.in:3140