Author: grothoff Date: 2006-12-29 23:13:48 -0800 (Fri, 29 Dec 2006) New Revision: 4122
Modified: GNUnet/src/applications/dht/tools/dht-query.c GNUnet/src/applications/dht/tools/dht_api.c GNUnet/src/applications/dstore/dstore.c Log: fixing dstore Modified: GNUnet/src/applications/dht/tools/dht-query.c =================================================================== --- GNUnet/src/applications/dht/tools/dht-query.c 2006-12-30 06:05:16 UTC (rev 4121) +++ GNUnet/src/applications/dht/tools/dht-query.c 2006-12-30 07:13:48 UTC (rev 4122) @@ -33,6 +33,8 @@ #include "gnunet_util_boot.h" #include "gnunet_util_network_client.h" +#define DEBUG_DHT_QUERY NO + /** * How long should a "GET" run (or how long should * content last on the network). @@ -81,11 +83,13 @@ hash(key, strlen(key), &hc); +#if DEBUG_DHT_QUERY GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Issuing '%s(%s)' command.\n", "get", key); +#endif if (timeout == 0) timeout = 30 * cronSECONDS; ret = DHT_LIB_get(cfg, @@ -115,12 +119,14 @@ memcpy(&dc[1], value, strlen(value)); +#if DEBUG_DHT_QUERY GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, _("Issuing '%s(%s,%s)' command.\n"), "put", key, value); +#endif if (timeout == 0) timeout = 30 * cronMINUTES; if (OK == DHT_LIB_put(cfg, Modified: GNUnet/src/applications/dht/tools/dht_api.c =================================================================== --- GNUnet/src/applications/dht/tools/dht_api.c 2006-12-30 06:05:16 UTC (rev 4121) +++ GNUnet/src/applications/dht/tools/dht_api.c 2006-12-30 07:13:48 UTC (rev 4122) @@ -30,6 +30,8 @@ #include "gnunet_dht_lib.h" #include "gnunet_util_network_client.h" +#define DEBUG_DHT_API NO + /** * Data exchanged between main thread and GET thread. */ @@ -216,11 +218,13 @@ GE_BREAK(ectx, 0); /* content already expired!? */ return SYSERR; } +#if DEBUG_DHT_API GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "DHT_LIB_put called with value '%.*s'\n", ntohl(value->size), &value[1]); +#endif sock = client_connection_create(ectx, cfg); if (sock == NULL) Modified: GNUnet/src/applications/dstore/dstore.c =================================================================== --- GNUnet/src/applications/dstore/dstore.c 2006-12-30 06:05:16 UTC (rev 4121) +++ GNUnet/src/applications/dstore/dstore.c 2006-12-30 07:13:48 UTC (rev 4122) @@ -70,6 +70,11 @@ static unsigned int stat_dstore_size; /** + * Estimate of the per-entry overhead (including indices). + */ +#define OVERHEAD ((4+4+8+8*2+sizeof(HashCode512)*2+32)) + +/** * @brief Prepare a SQL statement */ static int sq_prepare(sqlite3 * dbh, @@ -83,62 +88,196 @@ (const char**) &dummy); } -static void db_reset() { - int fd; +#define SQLITE3_EXEC(db, cmd) do { if (SQLITE_OK != sqlite3_exec(db, cmd, NULL, NULL, &emsg)) { GE_LOG(coreAPI->ectx, GE_ERROR | GE_ADMIN | GE_BULK, _("`%s' failed at %s:%d with error: %s\n"), "sqlite3_exec", __FILE__, __LINE__, emsg); sqlite3_free(emsg); } } while(0); - UNLINK(fn); - FREE(fn); - fn = STRDUP("/tmp/dstoreXXXXXX"); - fd = mkstemp(fn); - if (fd != -1) - CLOSE(fd); -} +static void db_init(sqlite3 * dbh) { + char * emsg; -static void db_init(sqlite3 * dbh) { - sqlite3_exec(dbh, - "PRAGMA temp_store=MEMORY", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, - "PRAGMA synchronous=OFF", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, - "PRAGMA count_changes=OFF", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, - "PRAGMA page_size=4092", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, + SQLITE3_EXEC(dbh, + "PRAGMA temp_store=MEMORY"); + SQLITE3_EXEC(dbh, + "PRAGMA synchronous=OFF"); + SQLITE3_EXEC(dbh, + "PRAGMA count_changes=OFF"); + SQLITE3_EXEC(dbh, + "PRAGMA page_size=4092"); + SQLITE3_EXEC(dbh, "CREATE TABLE ds071 (" " size INTEGER NOT NULL DEFAULT 0," " type INTEGER NOT NULL DEFAULT 0," " puttime INTEGER NOT NULL DEFAULT 0," " expire INTEGER NOT NULL DEFAULT 0," " key TEXT NOT NULL DEFAULT ''," - " value BLOB NOT NULL DEFAULT '')", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, - "CREATE INDEX idx_key ON ds071 (key)", - NULL, - NULL, - NULL); - sqlite3_exec(dbh, - "CREATE INDEX idx_puttime ON ds071 (puttime)", - NULL, - NULL, - NULL); + " value BLOB NOT NULL DEFAULT '')"); + SQLITE3_EXEC(dbh, + "CREATE INDEX idx_key ON ds071 (key)"); + SQLITE3_EXEC(dbh, + "CREATE INDEX idx_puttime ON ds071 (puttime)"); } +static int db_reset() { + int fd; + sqlite3 * dbh; + + if (fn != NULL) { + UNLINK(fn); + FREE(fn); + } + fn = STRDUP("/tmp/dstoreXXXXXX"); + fd = mkstemp(fn); + if (fd == -1) { + GE_BREAK(NULL, 0); + FREE(fn); + fn = NULL; + return SYSERR; + } + CLOSE(fd); + if (SQLITE_OK != sqlite3_open(fn, + &dbh)) + return SYSERR; + db_init(dbh); + sqlite3_close(dbh); + return OK; +} + /** + * Check that we are within quota. + * @return OK if we are. + */ +static int checkQuota(sqlite3 * dbh) { + HashCode512 dkey; + unsigned int dsize; + unsigned int dtype; + cron_t dputtime; + cron_t dexpire; + char * dcontent; + sqlite3_stmt * stmt; + sqlite3_stmt * dstmt; + int err; + + if (payload * 10 <= quota * 9) + return OK; /* we seem to be about 10% off */ +#if DEBUG_DSTORE + GE_LOG(coreAPI->ectx, + GE_DEBUG | GE_REQUEST | GE_DEVELOPER, + "DStore above qutoa (have %llu, allowed %llu), will delete some data.\n", + payload, + quota); +#endif + stmt = NULL; + dstmt = NULL; + if ( (sq_prepare(dbh, + "SELECT size, type, puttime, expire, key, value FROM ds071 ORDER BY puttime ASC", + &stmt) != SQLITE_OK) || + (sq_prepare(dbh, + "DELETE FROM ds071 " + "WHERE size = ? AND type = ? AND puttime = ? AND expire = ? AND key = ? AND value = ?", + &dstmt) != SQLITE_OK) ) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + _("`%s' failed at %s:%d with error: %s\n"), + "sq_prepare", + __FILE__, + __LINE__, + sqlite3_errmsg(dbh)); + GE_BREAK(NULL, 0); + if (dstmt != NULL) + sqlite3_finalize(dstmt); + if (stmt != NULL) + sqlite3_finalize(stmt); + return SYSERR; + } + dcontent = MALLOC(MAX_CONTENT_SIZE); + while ( (payload * 10 > quota * 9) && /* we seem to be about 10% off */ + ((err = sqlite3_step(stmt)) == SQLITE_ROW) ) { + dsize = sqlite3_column_int(stmt, 0); + dtype = sqlite3_column_int(stmt, 1); + dputtime = sqlite3_column_int64(stmt, 2); + dexpire = sqlite3_column_int64(stmt, 3); + GE_BREAK(NULL, + sqlite3_column_bytes(stmt, 4) == sizeof(HashCode512)); + GE_BREAK(NULL, + dsize == sqlite3_column_bytes(stmt, 5)); + memcpy(&dkey, + sqlite3_column_blob(stmt, 4), + sizeof(HashCode512)); + if (dsize >= MAX_CONTENT_SIZE) { + GE_BREAK(NULL, 0); + dsize = MAX_CONTENT_SIZE; + } + memcpy(dcontent, + sqlite3_column_blob(stmt, 5), + dsize); + sqlite3_reset(stmt); + sqlite3_bind_int(dstmt, + 1, + dsize); + sqlite3_bind_int(dstmt, + 2, + dtype); + sqlite3_bind_int64(dstmt, + 3, + dputtime); + sqlite3_bind_int64(dstmt, + 4, + dexpire); + sqlite3_bind_blob(dstmt, + 5, + &dkey, + sizeof(HashCode512), + SQLITE_TRANSIENT); + sqlite3_bind_blob(dstmt, + 6, + dcontent, + dsize, + SQLITE_TRANSIENT); + if ((err = sqlite3_step(dstmt)) != SQLITE_DONE) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + _("`%s' failed at %s:%d with error: %s\n"), + "sqlite3_step", + __FILE__, + __LINE__, + sqlite3_errmsg(dbh)); + sqlite3_reset(dstmt); + GE_BREAK(NULL, 0); /* should delete but cannot!? */ + break; + } + payload -= (dsize + OVERHEAD); +#if DEBUG_DSTORE + GE_LOG(coreAPI->ectx, + GE_DEBUG | GE_REQUEST | GE_DEVELOPER, + "Deleting %u bytes decreases DStore payload to %llu out of %llu\n", + dsize, + payload, + quota); +#endif + sqlite3_reset(dstmt); + } + if (err != SQLITE_DONE) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + _("`%s' failed at %s:%d with error: %s\n"), + "sqlite3_step", + __FILE__, + __LINE__, + sqlite3_errmsg(dbh)); + } + FREE(dcontent); + sqlite3_finalize(dstmt); + sqlite3_finalize(stmt); + if (payload * 10 > quota * 9) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_BULK | GE_DEVELOPER, + "Failed to delete content to drop below quota (bug?).\n", + payload, + quota); + return SYSERR; /* we seem to be about 10% off */ + } + return OK; +} + +/** * Store an item in the datastore. * * @return OK on success, SYSERR on error @@ -150,13 +289,13 @@ const char * data) { sqlite3 * dbh; sqlite3_stmt * stmt; - sqlite3_stmt * dstmt; if (size > MAX_CONTENT_SIZE) return SYSERR; MUTEX_LOCK(lock); - if (SQLITE_OK != sqlite3_open(fn, - &dbh)) { + if ( (fn == NULL) || + (SQLITE_OK != sqlite3_open(fn, + &dbh)) ) { db_reset(dbh); MUTEX_UNLOCK(lock); return SYSERR; @@ -168,12 +307,23 @@ size, data); #endif - db_init(dbh); + if (OK != checkQuota(dbh)) { + sqlite3_close(dbh); + MUTEX_UNLOCK(lock); + return SYSERR; + } if (sq_prepare(dbh, "INSERT INTO ds071 " "(size, type, puttime, expire, key, value) " "VALUES (?, ?, ?, ?, ?, ?)", &stmt) != SQLITE_OK) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + _("`%s' failed at %s:%d with error: %s\n"), + "sq_prepare", + __FILE__, + __LINE__, + sqlite3_errmsg(dbh)); sqlite3_close(dbh); MUTEX_UNLOCK(lock); return SYSERR; @@ -202,97 +352,16 @@ SQLITE_TRANSIENT); sqlite3_step(stmt); sqlite3_finalize(stmt); - stmt = NULL; - dstmt = NULL; - payload += size; + payload += size + OVERHEAD; +#if DEBUG_DSTORE GE_LOG(coreAPI->ectx, GE_DEBUG | GE_REQUEST | GE_DEVELOPER, "Storing %u bytes increases DStore payload to %llu out of %llu\n", size, payload, quota); - - if (payload > quota) { - GE_LOG(coreAPI->ectx, - GE_DEBUG | GE_REQUEST | GE_DEVELOPER, - "DStore above qutoa (have %llu, allowed %llu), will delete some data.\n", - payload, - quota); - if ( (sq_prepare(dbh, - "SELECT size, type, puttime, expire, key, value FROM ds071 ORDER BY puttime ASC", - &stmt) == SQLITE_OK) && - (sq_prepare(dbh, - "DELETE FROM ds071 " - "WHERE size = ? AND type = ? AND puttime = ? AND expire = ? AND key = ? AND value = ?", - &dstmt) == SQLITE_OK) ) { - HashCode512 dkey; - unsigned int dsize; - unsigned int dtype; - cron_t dputtime; - cron_t dexpire; - char * dcontent; - - dcontent = MALLOC(MAX_CONTENT_SIZE); - while ( (payload > quota) && - (sqlite3_step(stmt) == SQLITE_ROW) ) { - dsize = sqlite3_column_int(stmt, 0); - dtype = sqlite3_column_int(stmt, 1); - dputtime = sqlite3_column_int64(stmt, 2); - dexpire = sqlite3_column_int64(stmt, 3); - GE_BREAK(NULL, - sqlite3_column_bytes(stmt, 4) == sizeof(HashCode512)); - GE_BREAK(NULL, - dsize == sqlite3_column_bytes(stmt, 5)); - memcpy(&dkey, - sqlite3_column_blob(stmt, 4), - sizeof(HashCode512)); - if (dsize >= MAX_CONTENT_SIZE) { - GE_BREAK(NULL, 0); - dsize = MAX_CONTENT_SIZE; - } - memcpy(dcontent, - sqlite3_column_blob(stmt, 5), - dsize); - sqlite3_bind_int(dstmt, - 1, - dsize); - sqlite3_bind_int(dstmt, - 2, - dtype); - sqlite3_bind_int64(dstmt, - 3, - dputtime); - sqlite3_bind_int64(dstmt, - 4, - dexpire); - sqlite3_bind_blob(dstmt, - 5, - &dkey, - sizeof(HashCode512), - SQLITE_TRANSIENT); - sqlite3_bind_blob(dstmt, - 6, - dcontent, - dsize, - SQLITE_TRANSIENT); - if (sqlite3_step(dstmt) != SQLITE_ROW) { - sqlite3_reset(dstmt); - GE_BREAK(NULL, 0); /* should delete but cannot!? */ - break; - } - sqlite3_reset(dstmt); - } - FREE(dcontent); - sqlite3_finalize(dstmt); - sqlite3_finalize(stmt); - } else { - GE_BREAK(NULL, 0); - if (dstmt != NULL) - sqlite3_finalize(dstmt); - if (stmt != NULL) - sqlite3_finalize(stmt); - } - } +#endif + checkQuota(dbh); sqlite3_close(dbh); MUTEX_UNLOCK(lock); if (stats != NULL) @@ -322,8 +391,9 @@ unsigned int cnt; MUTEX_LOCK(lock); - if (SQLITE_OK != sqlite3_open(fn, - &dbh)) { + if ( (fn == NULL) || + (SQLITE_OK != sqlite3_open(fn, + &dbh)) ) { db_reset(dbh); MUTEX_UNLOCK(lock); return SYSERR; @@ -333,11 +403,17 @@ GE_DEBUG | GE_REQUEST | GE_DEVELOPER, "dstore processes get\n"); #endif - db_init(dbh); now = get_time(); if (sq_prepare(dbh, "SELECT size, value FROM ds071 WHERE key=? AND type=? AND expire >= ?", &stmt) != SQLITE_OK) { + GE_LOG(coreAPI->ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + _("`%s' failed at %s:%d with error: %s\n"), + "sq_prepare", + __FILE__, + __LINE__, + sqlite3_errmsg(dbh)); sqlite3_close(dbh); MUTEX_UNLOCK(lock); return SYSERR; @@ -377,20 +453,14 @@ Dstore_ServiceAPI * provide_module_dstore(CoreAPIForApplication * capi) { static Dstore_ServiceAPI api; - int fd; #if DEBUG_SQLITE GE_LOG(capi->ectx, GE_DEBUG | GE_REQUEST | GE_USER, "SQLite Dstore: initializing database\n"); #endif - fn = STRDUP("/tmp/dstoreXXXXXX"); - fd = mkstemp(fn); - if (fd == -1) { - FREE(fn); + if (OK != db_reset()) return NULL; - } - CLOSE(fd); lock = MUTEX_CREATE(NO); coreAPI = capi; api.get = &d_get; _______________________________________________ GNUnet-SVN mailing list GNUnet-SVN@gnu.org http://lists.gnu.org/mailman/listinfo/gnunet-svn