Shows how to extend WiredTiger with a custom file-system implementation.
#include <assert.h>
#include <errno.h>
#include <inttypes.h>
#include <queue.h>
#include <stdlib.h>
#include <string.h>
#ifndef _WIN32
#include <pthread.h>
#else
#include "windows_shim.h"
#endif
#include <wiredtiger.h>
#include <wiredtiger_ext.h>
static void
allocate_file_system_lock(pthread_rwlock_t *lockp)
{
assert(pthread_rwlock_init(lockp, NULL) == 0);
}
static void
destroy_file_system_lock(pthread_rwlock_t *lockp)
{
assert(pthread_rwlock_destroy(lockp) == 0);
}
static void
lock_file_system(pthread_rwlock_t *lockp)
{
assert(pthread_rwlock_wrlock(lockp) == 0);
}
static void
unlock_file_system(pthread_rwlock_t *lockp)
{
assert(pthread_rwlock_unlock(lockp) == 0);
}
typedef struct {
pthread_rwlock_t lock;
int opened_file_count;
int opened_unique_file_count;
int closed_file_count;
int read_ops;
int write_ops;
TAILQ_HEAD(demo_file_handle_qh, demo_file_handle) fileq;
} DEMO_FILE_SYSTEM;
typedef struct demo_file_handle {
DEMO_FILE_SYSTEM *demo_fs;
TAILQ_ENTRY(demo_file_handle) q;
uint32_t ref;
char *buf;
size_t bufsize;
size_t size;
} DEMO_FILE_HANDLE;
#ifdef _WIN32
__declspec(dllexport)
#endif
const char *, const char *, char ***, uint32_t *);
static int demo_fs_directory_list_free(
static int demo_fs_remove(
static int demo_fs_rename(
static int demo_fs_size(
static int demo_file_read(
static int demo_file_write(
static int demo_handle_remove(
WT_SESSION *, DEMO_FILE_HANDLE *);
static DEMO_FILE_HANDLE *demo_handle_search(
WT_FILE_SYSTEM *,
const char *);
#define DEMO_FILE_SIZE_INCREMENT 32768
static bool
byte_string_match(const char *str, const char *bytes, size_t len)
{
return (strncmp(str, bytes, len) == 0 && (str)[(len)] == '\0');
}
int
{
DEMO_FILE_SYSTEM *demo_fs;
int ret = 0;
if ((demo_fs = calloc(1, sizeof(DEMO_FILE_SYSTEM))) == NULL) {
"demo_file_system_create: %s",
return (ENOMEM);
}
demo_fs->wtext = wtext;
wtext, NULL, config, &config_parser)) != 0) {
"WT_EXTENSION_API.config_parser_open: config: %s",
goto err;
}
printf("Custom file system configuration\n");
while ((ret = config_parser->
next(config_parser, &k, &v)) == 0) {
if (byte_string_match(
"config_string", k.
str, k.
len)) {
printf("\t" "key %.*s=\"%.*s\"\n",
continue;
}
if (byte_string_match(
"config_value", k.
str, k.
len)) {
printf("\t" "key %.*s=%" PRId64 "\n",
continue;
}
ret = EINVAL;
"WT_CONFIG_PARSER.next: unexpected configuration "
"information: %.*s=%.*s: %s",
goto err;
}
"WT_CONFIG_PARSER.next: config: %s",
goto err;
}
if ((ret = config_parser->
close(config_parser)) != 0) {
"WT_CONFIG_PARSER.close: config: %s",
goto err;
}
allocate_file_system_lock(&demo_fs->lock);
file_system->
fs_size = demo_fs_size;
"WT_CONNECTION.set_file_system: %s",
goto err;
}
return (0);
err: free(demo_fs);
exit(1);
}
static int
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
int ret = 0;
(void)file_type;
(void)flags;
*file_handlep = NULL;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
demo_fh = NULL;
wtext = demo_fs->wtext;
lock_file_system(&demo_fs->lock);
++demo_fs->opened_file_count;
demo_fh = demo_handle_search(file_system, name);
if (demo_fh != NULL) {
if (demo_fh->ref != 0) {
"demo_fs_open: %s: file already open", name);
ret = EBUSY;
goto err;
}
demo_fh->ref = 1;
unlock_file_system(&demo_fs->lock);
return (0);
}
if ((demo_fh = calloc(1, sizeof(DEMO_FILE_HANDLE))) == NULL) {
ret = ENOMEM;
goto err;
}
demo_fh->demo_fs = demo_fs;
demo_fh->ref = 1;
if ((demo_fh->buf = calloc(1, DEMO_FILE_SIZE_INCREMENT)) == NULL) {
ret = ENOMEM;
goto err;
}
demo_fh->bufsize = DEMO_FILE_SIZE_INCREMENT;
demo_fh->size = 0;
if ((file_handle->
name = strdup(name)) == NULL) {
ret = ENOMEM;
goto err;
}
file_handle->
close = demo_file_close;
file_handle->
fh_lock = demo_file_lock;
file_handle->
fh_read = demo_file_read;
file_handle->
fh_size = demo_file_size;
file_handle->
fh_sync = demo_file_sync;
file_handle->
fh_write = demo_file_write;
TAILQ_INSERT_HEAD(&demo_fs->fileq, demo_fh, q);
++demo_fs->opened_unique_file_count;
*file_handlep = file_handle;
if (0) {
err: free(demo_fh->buf);
free(demo_fh);
}
unlock_file_system(&demo_fs->lock);
return (ret);
}
static int
const char *prefix, char ***dirlistp, uint32_t *countp)
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
size_t len, prefix_len;
uint32_t allocated, count;
int ret = 0;
char *name, **entries;
(void)session;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
*dirlistp = NULL;
*countp = 0;
entries = NULL;
allocated = count = 0;
len = strlen(directory);
prefix_len = prefix == NULL ? 0 : strlen(prefix);
lock_file_system(&demo_fs->lock);
TAILQ_FOREACH(demo_fh, &demo_fs->fileq, q) {
name = demo_fh->iface.name;
if (strncmp(name, directory, len) != 0 ||
(prefix != NULL && strncmp(name, prefix, prefix_len) != 0))
continue;
if (count >= allocated) {
entries = realloc(
entries, (allocated + 10) * sizeof(char *));
if (entries == NULL) {
ret = ENOMEM;
goto err;
}
memset(entries + allocated * sizeof(char *),
0, 10 * sizeof(char *));
allocated += 10;
}
entries[count++] = strdup(name);
}
*dirlistp = entries;
*countp = count;
err: unlock_file_system(&demo_fs->lock);
if (ret == 0)
return (0);
if (entries != NULL) {
while (count > 0)
free(entries[--count]);
free(entries);
}
return (ret);
}
static int
WT_SESSION *session,
char **dirlist, uint32_t count)
{
(void)file_system;
(void)session;
if (dirlist != NULL) {
while (count > 0)
free(dirlist[--count]);
free(dirlist);
}
return (0);
}
static int
WT_SESSION *session,
const char *name,
bool *existp)
{
DEMO_FILE_SYSTEM *demo_fs;
(void)session;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
lock_file_system(&demo_fs->lock);
*existp = demo_handle_search(file_system, name) != NULL;
unlock_file_system(&demo_fs->lock);
return (0);
}
static int
WT_SESSION *session,
const char *name, uint32_t flags)
{
DEMO_FILE_SYSTEM *demo_fs;
DEMO_FILE_HANDLE *demo_fh;
int ret = 0;
(void)session;
(void)flags;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
ret = ENOENT;
lock_file_system(&demo_fs->lock);
if ((demo_fh = demo_handle_search(file_system, name)) != NULL)
ret = demo_handle_remove(session, demo_fh);
unlock_file_system(&demo_fs->lock);
return (ret);
}
static int
WT_SESSION *session,
const char *from,
const char *to, uint32_t flags)
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
char *copy;
int ret = 0;
(void)session;
(void)flags;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
lock_file_system(&demo_fs->lock);
if ((demo_fh = demo_handle_search(file_system, from)) == NULL)
ret = ENOENT;
else if ((copy = strdup(to)) == NULL)
ret = ENOMEM;
else {
free(demo_fh->iface.name);
demo_fh->iface.name = copy;
}
unlock_file_system(&demo_fs->lock);
return (ret);
}
static int
WT_SESSION *session,
const char *name, wt_off_t *sizep)
{
DEMO_FILE_SYSTEM *demo_fs;
DEMO_FILE_HANDLE *demo_fh;
int ret = 0;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
ret = ENOENT;
lock_file_system(&demo_fs->lock);
if ((demo_fh = demo_handle_search(file_system, name)) != NULL)
unlock_file_system(&demo_fs->lock);
return (ret);
}
static int
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
int ret = 0, tret;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
while ((demo_fh = TAILQ_FIRST(&demo_fs->fileq)) != NULL)
if ((tret =
demo_handle_remove(session, demo_fh)) != 0 && ret == 0)
ret = tret;
printf("Custom file system\n");
printf("\t%d unique file opens\n", demo_fs->opened_unique_file_count);
printf("\t%d files opened\n", demo_fs->opened_file_count);
printf("\t%d files closed\n", demo_fs->closed_file_count);
printf("\t%d reads, %d writes\n",
demo_fs->read_ops, demo_fs->write_ops);
destroy_file_system_lock(&demo_fs->lock);
free(demo_fs);
return (ret);
}
static int
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
(void)session;
demo_fh = (DEMO_FILE_HANDLE *)file_handle;
demo_fs = demo_fh->demo_fs;
lock_file_system(&demo_fs->lock);
if (--demo_fh->ref == 0)
++demo_fs->closed_file_count;
unlock_file_system(&demo_fs->lock);
return (0);
}
static int
{
(void)file_handle;
(void)session;
(void)lock;
return (0);
}
static int
WT_SESSION *session, wt_off_t offset,
size_t len,
void *buf)
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
size_t off;
int ret = 0;
demo_fh = (DEMO_FILE_HANDLE *)file_handle;
demo_fs = demo_fh->demo_fs;
wtext = demo_fs->wtext;
off = (size_t)offset;
lock_file_system(&demo_fs->lock);
++demo_fs->read_ops;
if (off < demo_fh->size) {
if (len > demo_fh->size - off)
len = demo_fh->size - off;
memcpy(buf, (uint8_t *)demo_fh->buf + off, len);
} else
ret = EIO;
unlock_file_system(&demo_fs->lock);
if (ret == 0)
return (0);
"%s: handle-read: failed to read %zu bytes at offset %zu: %s",
demo_fh->iface.name, len, off, wtext->
strerror(wtext, NULL, ret));
return (ret);
}
static int
demo_file_size(
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
(void)session;
demo_fh = (DEMO_FILE_HANDLE *)file_handle;
demo_fs = demo_fh->demo_fs;
lock_file_system(&demo_fs->lock);
*sizep = (wt_off_t)demo_fh->size;
unlock_file_system(&demo_fs->lock);
return (0);
}
static int
{
(void)file_handle;
(void)session;
return (0);
}
static int
demo_buffer_resize(
WT_SESSION *session, DEMO_FILE_HANDLE *demo_fh, wt_off_t offset)
{
DEMO_FILE_SYSTEM *demo_fs;
size_t off;
void *p;
demo_fs = demo_fh->demo_fs;
wtext = demo_fs->wtext;
off = (size_t)offset;
if (demo_fh->bufsize >= off)
return (0);
if ((p = realloc(demo_fh->buf, off)) == NULL) {
"%s: failed to resize buffer",
demo_fh->iface.name, wtext->
strerror(wtext, NULL, ENOMEM));
return (ENOMEM);
}
memset((uint8_t *)p + demo_fh->bufsize, 0, off - demo_fh->bufsize);
demo_fh->buf = p;
demo_fh->bufsize = off;
return (0);
}
static int
demo_file_truncate(
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
int ret = 0;
demo_fh = (DEMO_FILE_HANDLE *)file_handle;
demo_fs = demo_fh->demo_fs;
lock_file_system(&demo_fs->lock);
if ((ret = demo_buffer_resize(session, demo_fh, offset)) == 0)
demo_fh->size = (size_t)offset;
unlock_file_system(&demo_fs->lock);
return (ret);
}
static int
wt_off_t offset, size_t len, const void *buf)
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
size_t off;
int ret = 0;
demo_fh = (DEMO_FILE_HANDLE *)file_handle;
demo_fs = demo_fh->demo_fs;
wtext = demo_fs->wtext;
off = (size_t)offset;
lock_file_system(&demo_fs->lock);
++demo_fs->write_ops;
if ((ret = demo_buffer_resize(session, demo_fh,
offset + (wt_off_t)(len + DEMO_FILE_SIZE_INCREMENT))) == 0) {
memcpy((uint8_t *)demo_fh->buf + off, buf, len);
if (off + len > demo_fh->size)
demo_fh->size = off + len;
}
unlock_file_system(&demo_fs->lock);
if (ret == 0)
return (0);
"%s: handle-write: failed to write %zu bytes at offset %zu: %s",
demo_fh->iface.name, len, off, wtext->
strerror(wtext, NULL, ret));
return (ret);
}
static int
demo_handle_remove(
WT_SESSION *session, DEMO_FILE_HANDLE *demo_fh)
{
DEMO_FILE_SYSTEM *demo_fs;
demo_fs = demo_fh->demo_fs;
wtext = demo_fs->wtext;
if (demo_fh->ref != 0) {
"demo_handle_remove: %s: file is currently open",
demo_fh->iface.name, wtext->
strerror(wtext, NULL, EBUSY));
return (EBUSY);
}
TAILQ_REMOVE(&demo_fs->fileq, demo_fh, q);
free(demo_fh->buf);
free(demo_fh->iface.name);
free(demo_fh);
return (0);
}
static DEMO_FILE_HANDLE *
{
DEMO_FILE_HANDLE *demo_fh;
DEMO_FILE_SYSTEM *demo_fs;
demo_fs = (DEMO_FILE_SYSTEM *)file_system;
TAILQ_FOREACH(demo_fh, &demo_fs->fileq, q)
if (strcmp(demo_fh->iface.name, name) == 0)
break;
return (demo_fh);
}
static const char *home;
int
main(void)
{
const char *key, *open_config, *uri;
int i;
int ret = 0;
char kbuf[64];
if (getenv("WIREDTIGER_HOME") == NULL) {
home = "WT_HOME";
ret = system("rm -rf WT_HOME && mkdir WT_HOME");
} else
home = NULL;
open_config = "create,log=(enabled=true),extensions=(local={"
"entry=demo_file_system_create,early_load=true,"
"config={config_string=\"demo-file-system\",config_value=37}"
"})";
fprintf(stderr, "Error connecting to %s: %s\n",
return (EXIT_FAILURE);
}
if ((ret = conn->
open_session(conn, NULL, NULL, &session)) != 0) {
fprintf(stderr, "WT_CONNECTION.open_session: %s\n",
return (EXIT_FAILURE);
}
uri = "table:fs";
session, uri, "key_format=S,value_format=S")) != 0) {
fprintf(stderr, "WT_SESSION.create: %s: %s\n",
return (EXIT_FAILURE);
}
session, uri, NULL, NULL, &cursor)) != 0) {
fprintf(stderr, "WT_SESSION.open_cursor: %s: %s\n",
return (EXIT_FAILURE);
}
for (i = 0; i < 1000; ++i) {
(void)snprintf(kbuf, sizeof(kbuf), "%010d KEY -----", i);
if ((ret = cursor->
insert(cursor)) != 0) {
fprintf(stderr, "WT_CURSOR.insert: %s: %s\n",
return (EXIT_FAILURE);
}
}
if ((ret = cursor->
close(cursor)) != 0) {
fprintf(stderr, "WT_CURSOR.close: %s\n",
return (EXIT_FAILURE);
}
session, uri, NULL, NULL, &cursor)) != 0) {
fprintf(stderr, "WT_SESSION.open_cursor: %s: %s\n",
return (EXIT_FAILURE);
}
for (i = 0; i < 1000; ++i) {
if ((ret = cursor->
next(cursor)) != 0) {
fprintf(stderr, "WT_CURSOR.insert: %s: %s\n",
return (EXIT_FAILURE);
}
(void)snprintf(kbuf, sizeof(kbuf), "%010d KEY -----", i);
if ((ret = cursor->
get_key(cursor, &key)) != 0) {
fprintf(stderr, "WT_CURSOR.get_key: %s\n",
return (EXIT_FAILURE);
}
if (strcmp(kbuf, key) != 0) {
fprintf(stderr, "Key mismatch: %s, %s\n", kbuf, key);
return (EXIT_FAILURE);
}
}
fprintf(stderr,
"WT_CURSOR.insert: expected WT_NOTFOUND, got %s\n",
return (EXIT_FAILURE);
}
if ((ret = conn->
close(conn, NULL)) != 0) {
fprintf(stderr, "Error closing connection to %s: %s\n",
return (EXIT_FAILURE);
}
return (EXIT_SUCCESS);
}