23 September 2009

db_sql: the new utility for BerkeleyDB. My Notebook.

The new version of BerkeleyDB 4.8 has been released. This new version of the key/value storage engine comes with a new utility called db_sql
From Oracle: Db_sql is a utility program that translates a schema description written in a SQL Data Definition Language dialect into C code that implements the schema using Berkeley DB. It is intended to provide a quick and easy means of getting started with Berkeley DB for users who are already conversant with SQL. It also introduces a convenient way to express a Berkeley DB schema in a format that is both external to the program that uses it and compatible with relational databases.

On my side, I still use Apache Velocity to generate this kind of code (see this older post )

Let's generate the C-code for a simple database storing some SNPs. The heterozygosity, the flanking sequences and the observed variation will be stored and the #rs-id will be indexed. My first attempt was:

CREATE DATABASE snpDatabase;

CREATE TABLE snp (
name varchar(50) NOT NULL PRIMARY KEY,
rs_id VARCHAR(20) NULL,
avHet float,
seq5 text,
observed text,
seq3 text,
class enum('unknown','single','in-del','het','microsatellite','named','mixed','mnp','insertion','deletion')
);

CREATE INDEX dbsnp_id ON snp(rs_id);


But it seemed that: enums, the TEXT type and the NULL/NOT NULL modifier are not supported. So, my second try was:
CREATE DATABASE snpDatabase;

CREATE TABLE snp (
name varchar(50) PRIMARY KEY,
rs_id varchar(20) NULL ,
avHet float,
seq5 varchar(300),
observed varchar(20),
seq3 varchar(300)
);

CREATE INDEX dbsnp_id ON snp(rs_id);


Invoking db_sql


The following command generates the C code for managing the snps in the database as well as a test. The code contains the structure 'struct _snp_data' describing a SNP, the functions to open/close the database, inserting/removing a _snp_data, serializing/de-serializing the structure
/usr/local/BerkeleyDB.4.8/bin/db_sql -i schema.sql -o snp.c -h snp.h -t snp_test.c


snp.h

/*
* Header file for a Berkeley DB implementation
* generated from SQL DDL by db_sql
*/
#include <sys/types.h>
#include <sys/stat.h>
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include "db.h"

/*
* Array size constants.
*/
#define SNP_DATA_NAME_LENGTH 50
#define SNP_DATA_RS_ID_LENGTH 20
#define SNP_DATA_SEQ5_LENGTH 300
#define SNP_DATA_OBSERVED_LENGTH 20
#define SNP_DATA_SEQ3_LENGTH 300

/*
* Data structures representing the record layouts
*/
typedef struct _snp_data {
char name[SNP_DATA_NAME_LENGTH];
char rs_id[SNP_DATA_RS_ID_LENGTH];
double avHet;
char seq5[SNP_DATA_SEQ5_LENGTH];
char observed[SNP_DATA_OBSERVED_LENGTH];
char seq3[SNP_DATA_SEQ3_LENGTH];
} snp_data;


snp_data snp_data_specimen;

/*
* Macros for the maximum length of the
* records after serialization. This is
* the maximum size of the data that is stored
*/
#define SNP_DATA_SERIALIZED_LENGTH (sizeof(snp_data_specimen.name) + \
sizeof(snp_data_specimen.rs_id) + \
sizeof(snp_data_specimen.avHet) + \
sizeof(snp_data_specimen.seq5) + \
sizeof(snp_data_specimen.observed) + \
sizeof(snp_data_specimen.seq3))

/*
* These typedefs are prototypes for the user-written
* iteration callback functions, which are invoked during
* full iteration and secondary index queries
*/
typedef void (*snp_iteration_callback)(void *, snp_data *);

/*
* The environment creation/initialization function
*/
int create_snpDatabase_env(DB_ENV **envpp);

/*
* Database creation/initialization functions
*/
int create_snp_database(DB_ENV *envp, DB **dbpp);

/*
* Database removal functions
*/
int remove_snp_database(DB_ENV *envp);

/*
* Functions for inserting records by providing
* the full corresponding data structure
*/
int snp_insert_struct(DB *dbp, snp_data *snpp);

/*
* Functions for inserting records by providing
* each field value as a separate argument
*/
int snp_insert_fields(DB * dbp,
char *name,
char *rs_id,
double avHet,
char *seq5,
char *observed,
char *seq3);

/*
* Functions for retrieving records by key
*/
int get_snp_data(DB *dbp, char *snp_key, snp_data *data);

/*
* Functions for deleting records by key
*/
int delete_snp_key(DB *dbp, char *snp_key);

/*
* Functions for doing iterations over
* an entire primary database
*/
int snp_full_iteration(DB *dbp,
snp_iteration_callback user_func,
void *user_data);

/*
* Index creation and removal functions
*/
int create_dbsnp_id_secondary(DB_ENV *envp, DB *dbpp, DB **secondary_dbpp);

int remove_dbsnp_id_index(DB_ENV * envp);

int dbsnp_id_query_iteration(DB *secondary_dbp,
char *dbsnp_id_key,
snp_iteration_callback user_func,
void *user_data);

/*
* This convenience method invokes all of the
* environment and database creation methods necessary
* to initialize the complete BDB environment. It uses
* the global environment and database pointers declared
* below. You may bypass this function and use your own
* environment and database pointers, if you wish.
*/
int initialize_snpDatabase_environment();

extern DB_ENV * snpDatabase_envp;
extern DB *snp_dbp;
extern DB *dbsnp_id_dbp;


snp.c

#include "snp.h"


int create_snpDatabase_env(DB_ENV **envpp)
{
int ret, flags;
char *env_name = "./snpDatabase";

if ((ret = db_env_create(envpp, 0)) != 0) {
fprintf(stderr, "db_env_create: %s", db_strerror(ret));
return 1;
}

(*envpp)->set_errfile(*envpp, stderr);
flags = DB_CREATE | DB_INIT_MPOOL;


if ((ret = (*envpp)->open(*envpp, env_name, flags, 0)) != 0) {
(*envpp)->err(*envpp, ret, "DB_ENV->open: %s", env_name);
return 1;
}
return 0;
}

/*
* These are custom comparator functions for integer keys. They are
* needed to make integers sort well on little-endian architectures,
* such as x86. cf. discussion of btree comparators in 'Getting Started
* with Data Storage' manual.
*/
static int
compare_int(DB *dbp, const DBT *a, const DBT *b)
{
int ai, bi;

memcpy(&ai, a->data, sizeof(int));
memcpy(&bi, b->data, sizeof(int));
return (ai - bi);
}

int
compare_long(DB *dbp, const DBT *a, const DBT *b)
{
long ai, bi;

memcpy(&ai, a->data, sizeof(long));
memcpy(&bi, b->data, sizeof(long));
return (ai - bi);
}

/*
* A generic function for creating and opening a database
*/
int
create_database(DB_ENV *envp,
char *db_name,
DB **dbpp,
int flags,
DBTYPE type,
int moreflags,
int (*comparator)(DB *, const DBT *, const DBT *))
{
int ret;
FILE *errfilep;

if ((ret = db_create(dbpp, envp, 0)) != 0) {
envp->err(envp, ret, "db_create");
return ret;
}

if (moreflags != 0)
(*dbpp)->set_flags(*dbpp, moreflags);

if (comparator != NULL)
(*dbpp)->set_bt_compare(*dbpp, comparator);

envp->get_errfile(envp, &errfilep);
(*dbpp)->set_errfile(*dbpp, errfilep);
if ((ret = (*dbpp)->open(*dbpp, NULL, db_name,
NULL, type, flags, 0644)) != 0) {
(*dbpp)->err(*dbpp, ret, "DB->open: %s", db_name);
return ret;
}

return 0;
}

int create_snp_database(DB_ENV *envp, DB **dbpp)
{
return create_database(envp, "snp.db", dbpp,
DB_CREATE, DB_BTREE, 0, NULL);
}


int remove_snp_database(DB_ENV *envp)
{
return envp->dbremove(envp, NULL, "snp.db", NULL, 0);
}


int serialize_snp_data(snp_data *snpp, char *buffer)
{
size_t len;
char *p;

memset(buffer, 0, SNP_DATA_SERIALIZED_LENGTH);
p = buffer;


len = strlen(snpp->name) + 1;
assert(len <= SNP_DATA_NAME_LENGTH);
memcpy(p, snpp->name, len);
p += len;


len = strlen(snpp->rs_id) + 1;
assert(len <= SNP_DATA_RS_ID_LENGTH);
memcpy(p, snpp->rs_id, len);
p += len;

memcpy(p, &snpp->avHet, sizeof(snpp->avHet));
p += sizeof(snpp->avHet);

len = strlen(snpp->seq5) + 1;
assert(len <= SNP_DATA_SEQ5_LENGTH);
memcpy(p, snpp->seq5, len);
p += len;


len = strlen(snpp->observed) + 1;
assert(len <= SNP_DATA_OBSERVED_LENGTH);
memcpy(p, snpp->observed, len);
p += len;


len = strlen(snpp->seq3) + 1;
assert(len <= SNP_DATA_SEQ3_LENGTH);
memcpy(p, snpp->seq3, len);
p += len;


return p - buffer;
}

void deserialize_snp_data(char *buffer, snp_data *snpp)
{
size_t len;

memset(snpp, 0, sizeof(*snpp));

len = strlen(buffer) + 1;
assert(len <= SNP_DATA_NAME_LENGTH);
memcpy(snpp->name, buffer, len);
buffer += len;


len = strlen(buffer) + 1;
assert(len <= SNP_DATA_RS_ID_LENGTH);
memcpy(snpp->rs_id, buffer, len);
buffer += len;

memcpy(&snpp->avHet, buffer, sizeof(snpp->avHet));
buffer += sizeof(snpp->avHet);

len = strlen(buffer) + 1;
assert(len <= SNP_DATA_SEQ5_LENGTH);
memcpy(snpp->seq5, buffer, len);
buffer += len;


len = strlen(buffer) + 1;
assert(len <= SNP_DATA_OBSERVED_LENGTH);
memcpy(snpp->observed, buffer, len);
buffer += len;


len = strlen(buffer) + 1;
assert(len <= SNP_DATA_SEQ3_LENGTH);
memcpy(snpp->seq3, buffer, len);
buffer += len;

}


int snp_insert_struct( DB *dbp, snp_data *snpp)
{
DBT key_dbt, data_dbt;
char serialized_data[SNP_DATA_SERIALIZED_LENGTH];
int ret, serialized_size;
char *snp_key = snpp->name;

memset(&key_dbt, 0, sizeof(key_dbt));
memset(&data_dbt, 0, sizeof(data_dbt));

key_dbt.data = snp_key;
key_dbt.size = strlen(snp_key) + 1;

serialized_size = serialize_snp_data(snpp, serialized_data);

data_dbt.data = serialized_data;
data_dbt.size = serialized_size;

if ((ret = dbp->put(dbp, NULL, &key_dbt, &data_dbt, 0)) != 0) {
dbp->err(dbp, ret, "Inserting key %d", snp_key);
return -1;
}
return 0;
}

int snp_insert_fields(DB *dbp,
char *name,
char *rs_id,
double avHet,
char *seq5,
char *observed,
char *seq3)
{
snp_data data;
assert(strlen(name) < SNP_DATA_NAME_LENGTH);
strncpy(data.name, name, SNP_DATA_NAME_LENGTH);
assert(strlen(rs_id) < SNP_DATA_RS_ID_LENGTH);
strncpy(data.rs_id, rs_id, SNP_DATA_RS_ID_LENGTH);
data.avHet = avHet;
assert(strlen(seq5) < SNP_DATA_SEQ5_LENGTH);
strncpy(data.seq5, seq5, SNP_DATA_SEQ5_LENGTH);
assert(strlen(observed) < SNP_DATA_OBSERVED_LENGTH);
strncpy(data.observed, observed, SNP_DATA_OBSERVED_LENGTH);
assert(strlen(seq3) < SNP_DATA_SEQ3_LENGTH);
strncpy(data.seq3, seq3, SNP_DATA_SEQ3_LENGTH);
return snp_insert_struct(dbp, &data);
}


int get_snp_data(DB *dbp,
char *snp_key,
snp_data *data)
{
DBT key_dbt, data_dbt;
int ret;
char *canonical_key = snp_key;

memset(&key_dbt, 0, sizeof(key_dbt));
memset(&data_dbt, 0, sizeof(data_dbt));

key_dbt.data = canonical_key;
key_dbt.size = strlen(canonical_key) + 1;

if ((ret = dbp->get(dbp, NULL, &key_dbt, &data_dbt, 0)) != 0) {
dbp->err(dbp, ret, "Retrieving key %d", snp_key);
return ret;
}

assert(data_dbt.size <= SNP_DATA_SERIALIZED_LENGTH);

deserialize_snp_data(data_dbt.data, data);
return 0;
}


int delete_snp_key(DB *dbp, char *snp_key)
{
DBT key_dbt;
int ret;
char *canonical_key = snp_key;

memset(&key_dbt, 0, sizeof(key_dbt));

key_dbt.data = canonical_key;
key_dbt.size = strlen(canonical_key) + 1;

if ((ret = dbp->del(dbp, NULL, &key_dbt, 0)) != 0) {
dbp->err(dbp, ret, "deleting key %d", snp_key);
return ret;
}

return 0;
}


int snp_full_iteration(DB *dbp,
snp_iteration_callback user_func,
void *user_data)
{
DBT key_dbt, data_dbt;
DBC *cursorp;
snp_data deserialized_data;
int ret;

memset(&key_dbt, 0, sizeof(key_dbt));
memset(&data_dbt, 0, sizeof(data_dbt));

if ((ret = dbp->cursor(dbp, NULL, &cursorp, 0)) != 0) {
dbp->err(dbp, ret, "creating cursor");
return ret;
}

while ((ret = cursorp->get(cursorp, &key_dbt, &data_dbt, DB_NEXT)) == 0) {
deserialize_snp_data(data_dbt.data, &deserialized_data);
(*user_func)(user_data, &deserialized_data);
}

if (ret != DB_NOTFOUND) {
dbp->err(dbp, ret, "Full iteration");
cursorp->close(cursorp);
return ret;
}

cursorp->close(cursorp);

return 0;
}

int dbsnp_id_callback(DB *dbp,
const DBT *key_dbt,
const DBT *data_dbt,
DBT *secondary_key_dbt)
{

int ret;
snp_data deserialized_data;

deserialize_snp_data(data_dbt->data, &deserialized_data);

memset(secondary_key_dbt, 0, sizeof(*secondary_key_dbt));
secondary_key_dbt->size = strlen(deserialized_data.rs_id) + 1;
secondary_key_dbt->data = malloc(secondary_key_dbt->size);
memcpy(secondary_key_dbt->data, deserialized_data.rs_id,
secondary_key_dbt->size);

/* tell the caller to free memory referenced by secondary_key_dbt */
secondary_key_dbt->flags = DB_DBT_APPMALLOC;

return 0;
}

int create_dbsnp_id_secondary(DB_ENV *envp,
DB *primary_dbp,
DB **secondary_dbpp)
{
int ret;
char * secondary_name = "dbsnp_id.db";

if ((ret = create_database(envp, secondary_name, secondary_dbpp,
DB_CREATE, DB_BTREE, DB_DUPSORT, NULL)) != 0)
return ret;

if ((ret = primary_dbp->associate(primary_dbp, NULL, *secondary_dbpp,
&dbsnp_id_callback, DB_CREATE)) != 0) {
(*secondary_dbpp)->err(*secondary_dbpp, ret,
"DB->associate: %s.db", secondary_name);
return ret;
}
return 0;
}

int remove_dbsnp_id_index(DB_ENV *envp)
{
return envp->dbremove(envp, NULL, "dbsnp_id.db", NULL, 0);
}

int dbsnp_id_query_iteration(DB *secondary_dbp,
char *dbsnp_id_key,
snp_iteration_callback user_func,
void *user_data)
{
DBT key_dbt, data_dbt;
DBC *cursorp;
snp_data deserialized_data;
int ret;

memset(&key_dbt, 0, sizeof(key_dbt));
memset(&data_dbt, 0, sizeof(data_dbt));

if ((ret = secondary_dbp->cursor(secondary_dbp, NULL, &cursorp, 0)) != 0) {
secondary_dbp->err(secondary_dbp, ret, "creating cursor");
return ret;
}

key_dbt.data = dbsnp_id_key;
key_dbt.size = strlen(dbsnp_id_key) + 1;

for (ret = cursorp->get(cursorp, &key_dbt, &data_dbt, DB_SET);
ret == 0;
ret = cursorp->get(cursorp, &key_dbt, &data_dbt, DB_NEXT_DUP)) {
deserialize_snp_data(data_dbt.data, &deserialized_data);
(*user_func)(user_data, &deserialized_data);
}

if (ret != DB_NOTFOUND) {
secondary_dbp->err(secondary_dbp, ret, "Querying secondary");
return ret;
}

cursorp->close(cursorp);

return 0;
}

DB_ENV * snpDatabase_envp = NULL;
DB *snp_dbp = NULL;
DB *dbsnp_id_dbp = NULL;

int initialize_snpDatabase_environment()
{
if (create_snpDatabase_env(&snpDatabase_envp) != 0)
goto exit_error;

if (create_snp_database(snpDatabase_envp, &snp_dbp) != 0)
goto exit_error;

if (create_dbsnp_id_secondary(snpDatabase_envp, snp_dbp, &dbsnp_id_dbp) != 0)
goto exit_error;

return 0;

exit_error:

fprintf(stderr, "Stopping initialization because of error\n");
return -1;
}

snp_test.c


/*
* Simple test for a Berkeley DB implementation
* generated from SQL DDL by db_sql
*/

#include "snp.h"

/*
* These are the iteration callback functions. One is defined per
* database(table). They are used for both full iterations and for
* secondary index queries. When a retrieval returns multiple records,
* as in full iteration over an entire database, one of these functions
* is called for each record found
*/

void snp_iteration_callback_test(void *msg, snp_data *snp_record)
{
printf("In iteration callback, message is: %s\n", (char *)msg);

printf("snp->name: %s\n", snp_record->name);
printf("snp->rs_id: %s\n", snp_record->rs_id);
printf("snp->avHet: %lf\n", snp_record->avHet);
printf("snp->seq5: %s\n", snp_record->seq5);
printf("snp->observed: %s\n", snp_record->observed);
printf("snp->seq3: %s\n", snp_record->seq3);
}


main(int argc, char **argv)
{
int i;
int ret;

snp_data snp_record;
snp_dbp = NULL;
dbsnp_id_dbp = NULL;

/*
* Use the convenience method to initialize the environment.
* The initializations for each entity and environment can be
* done discretely if you prefer, but this is the easy way.
*/
ret = initialize_snpDatabase_environment();
if (ret != 0){
printf("Initialize error");
return ret;
}

/*
* Now that everything is initialized, insert a single
* record into each database, using the ...insert_fields
* functions. These functions take each field of the
* record as a separate argument
*/
ret = snp_insert_fields( snp_dbp, "ninety-nine", "ninety-nine", 99.5, "ninety-nine", "ninety-nine", "ninety-nine");
if (ret != 0){
printf("Insert error\n");
goto exit_error;
}


/*
* Next, retrieve the records just inserted, looking them up
* by their key values
*/

printf("Retrieval of snp record by key\n");
ret = get_snp_data( snp_dbp, "ninety-nine", &snp_record);

printf("snp.name: %s\n", snp_record.name);
printf("snp.rs_id: %s\n", snp_record.rs_id);
printf("snp.avHet: %lf\n", snp_record.avHet);
printf("snp.seq5: %s\n", snp_record.seq5);
printf("snp.observed: %s\n", snp_record.observed);
printf("snp.seq3: %s\n", snp_record.seq3);
if (ret != 0)
{
printf("Retrieve error\n");
goto exit_error;
}


/*
* Now try iterating over every record, using the ...full_iteration
* functions for each database. For each record found, the
* appropriate ...iteration_callback_test function will be invoked
* (these are defined above).
*/
ret = snp_full_iteration(snp_dbp, &snp_iteration_callback_test,
"retrieval of snp record through full iteration");
if (ret != 0){
printf("Full Iteration Error\n");
goto exit_error;
}


/*
* For the secondary indexes, query for the known keys. This also
* results in the ...iteration_callback_test function's being called
* for each record found.
*/
dbsnp_id_query_iteration(dbsnp_id_dbp, "ninety-nine",
&snp_iteration_callback_test,
"retrieval of snp record through dbsnp_id query");

/*
* Now delete a record from each database using its primary key.
*/
ret = delete_snp_key( snp_dbp, "ninety-nine");
if (ret != 0) {
printf("Delete error\n");
goto exit_error;
}


exit_error:
/*
* Close the secondary index databases
*/
if (dbsnp_id_dbp != NULL)
dbsnp_id_dbp->close(dbsnp_id_dbp, 0);


/*
* Close the primary databases
*/
if (snp_dbp != NULL)
snp_dbp->close(snp_dbp, 0);


/*
* Delete the secondary index databases
*/
remove_dbsnp_id_index(snpDatabase_envp);

/*
* Delete the primary databases
*/
remove_snp_database(snpDatabase_envp);

/*
* Finally, close the environment
*/
snpDatabase_envp->close(snpDatabase_envp, 0);
return ret;
}

Compiling an Running the test


export LD_LIBRARY_PATH=/usr/local/BerkeleyDB.4.8/lib
gcc snp_test.c snp.c -ldb
mkdir snpDatabase
./a.out
Retrieval of snp record by key
snp.name: ninety-nine
snp.rs_id: ninety-nine
snp.avHet: 99.500000
snp.seq5: ninety-nine
snp.observed: ninety-nine
snp.seq3: ninety-nine
In iteration callback, message is: retrieval of snp record through full iteration
snp->name: ninety-nine
snp->rs_id: ninety-nine
snp->avHet: 99.500000
snp->seq5: ninety-nine
snp->observed: ninety-nine
snp->seq3: ninety-nine
In iteration callback, message is: retrieval of snp record through dbsnp_id query
snp->name: ninety-nine
snp->rs_id: ninety-nine
snp->avHet: 99.500000
snp->seq5: ninety-nine
snp->observed: ninety-nine
snp->seq3: ninety-nine



That's it
Pierre

No comments: