Changeset: 1d07c616784e for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/1d07c616784e Modified Files: gdk/gdk_logger.c gdk/gdk_logger_internals.h sql/include/sql_catalog.h sql/storage/bat/bat_storage.c sql/storage/store.c Branch: Sep2022 Log Message:
merged with jan2022 diffs (224 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -666,8 +666,10 @@ la_bat_updates(logger *lg, logaction *la if (bid < 0) return GDK_FAIL; - if (bid == 0) - return GDK_SUCCEED; /* ignore bats no longer in the catalog */ + if (!bid) { + GDKerror("la_bat_updates failed to find bid for object %d\n", la->cid); + return GDK_FAIL; + } if (!lg->flushing) { b = BATdescriptor(bid); @@ -760,6 +762,10 @@ la_bat_destroy(logger *lg, logaction *la if (bid < 0) return GDK_FAIL; + if (!bid) { + GDKerror("la_bat_destroy failed to find bid for object %d\n", la->cid); + return GDK_FAIL; + } if (bid && log_del_bat(lg, bid) != GDK_SUCCEED) return GDK_FAIL; return GDK_SUCCEED; @@ -1041,6 +1047,7 @@ log_open_output(logger *lg) } lg->end = 0; + lg->drops = 0; if (!LOG_DISABLED(lg)) { char id[32]; char *filename; @@ -1669,8 +1676,6 @@ bm_subcommit(logger *lg) sizes[i] = BATcount(dcatalog); n[i++] = dcatalog->batCacheid; - if (cleanup < (lg->cnt/2)) - cleanup = 0; if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup)) < 0) { GDKfree(n); GDKfree(r); @@ -1689,7 +1694,7 @@ bm_subcommit(logger *lg) sizes[i] = BATcount(lg->seqs_id); n[i++] = lg->seqs_val->batCacheid; } - if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2)) { + if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2) && BATcount(lg->dseqs) > 10 ) { BAT *tids, *ids, *vals; tids = bm_tids(lg->seqs_id, lg->dseqs); @@ -2151,6 +2156,8 @@ log_new(int debug, const char *fn, const .funcdata = funcdata, .id = 0, + .drops = 0, + .end = 0, .saved_id = getBBPlogno(), /* get saved log numer from bbp */ .saved_tid = (int)getBBPtransid(), /* get saved transaction id from bbp */ }; @@ -2304,21 +2311,18 @@ log_cleanup_range(logger *lg, ulng id) gdk_return log_activate(logger *lg) { + gdk_return res = GDK_SUCCEED; MT_lock_set(&lg->rotation_lock); log_lock(lg); - if (lg->end > 0 && lg->saved_id+1 == lg->id) { + if (lg->drops > 100000 && lg->end > 0 && lg->saved_id+1 == lg->id) { lg->id++; log_close_output(lg); /* start new file */ - if (log_open_output(lg) != GDK_SUCCEED) { - log_unlock(lg); - MT_lock_unset(&lg->rotation_lock); - return GDK_FAIL; - } + res = log_open_output(lg); } log_unlock(lg); MT_lock_unset(&lg->rotation_lock); - return GDK_SUCCEED; + return res; } gdk_return @@ -2659,6 +2663,11 @@ log_bat_transient(logger *lg, log_id id) log_unlock(lg); return GDK_FAIL; } + if (!bid) { + GDKerror("log_bat_transient failed to find bid for object %d\n", id); + log_unlock(lg); + return GDK_FAIL; + } l.flag = LOG_DESTROY; l.id = id; @@ -2676,7 +2685,9 @@ log_bat_transient(logger *lg, log_id id) bid); BAT *b = BBPquickdesc(bid); assert(b); - lg->end += BATcount(b); + BUN cnt = BATcount(b); + lg->end += cnt; + lg->drops += cnt; gdk_return r = log_del_bat(lg, bid); log_unlock(lg); if (r != GDK_SUCCEED) @@ -2828,7 +2839,7 @@ new_logfile(logger *lg, stream* output_l const lng p = (lng) getfilepos(getFile(lg->output_log)); if (p == -1) return GDK_FAIL; - if (( p > log_large || (lg->end*1024) > log_large )) { + if (lg->drops > 100000 || p > log_large || (lg->end*1024) > log_large) { log_lock(lg); if (ATOMIC_GET(&lg->refcount) == 1) { lg->id++; @@ -2927,6 +2938,10 @@ log_tdone(logger *lg, ulng commit_ts) if (lg->current) { lg->current->last_ts = commit_ts; } + stream* output_log = lg->output_log; + ulng id = lg->id; + if (!LOG_DISABLED(lg) && new_logfile(lg, output_log, id) != GDK_SUCCEED) + GDKfatal("Could not create new log file\n"); } gdk_return @@ -2962,8 +2977,7 @@ log_tflush(logger* lg, ulng log_file_id, id = lg->id; MT_lock_unset(&lg->rotation_lock); if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) || - (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(output_log)) || - new_logfile(lg, output_log, id) != GDK_SUCCEED) { + (!(GDKdebug & NOSYNCMASK) && mnstr_fsync(output_log))) { /* flush failed */ MT_lock_set(&lg->rotation_lock); lg->flushing_output_log = false; @@ -3155,6 +3169,10 @@ log_find_bat(logger *lg, log_id id) log_lock(lg); log_bid bid = internal_find_bat(lg, id, -1); log_unlock(lg); + if (!bid) { + GDKerror("logger_find_bat failed to find bid for object %d\n", id); + return GDK_FAIL; + } return bid; } diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -30,6 +30,7 @@ struct logger { int saved_tid; /* id of transaction which was flushed out (into BBP storage) */ bool flushing; bool flushnow; + ulng drops; bool request_rotation; logged_range *pending; logged_range *current; diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h --- a/sql/include/sql_catalog.h +++ b/sql/include/sql_catalog.h @@ -204,6 +204,7 @@ typedef struct sql_base { } sql_base; #define isNew(x) ((x)->base.new) +#define isDeleted(x) ((x)->base.deleted) extern void base_init(sql_allocator *sa, sql_base * b, sqlid id, bool isnew, const char *name); diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c --- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -286,6 +286,7 @@ segments2cs(sql_trans *tr, segments *seg MT_lock_unset(&b->theaplock); uint32_t *restrict dst; + /* why hashlock ?? */ MT_rwlock_wrlock(&b->thashlock); for (; s ; s=s->next) { if (s->start >= nr) @@ -4150,7 +4151,7 @@ log_update_col( sql_trans *tr, sql_chang { sql_column *c = (sql_column*)change->obj; - if (!isTempTable(c->t) && !tr->parent) {/* don't write save point commits */ + if (!isDeleted(c->t) && !isTempTable(c->t) && !tr->parent) {/* don't write save point commits */ storage *s = ATOMIC_PTR_GET(&c->t->data); sql_delta *d = ATOMIC_PTR_GET(&c->data); return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id); @@ -4263,7 +4264,7 @@ log_update_idx( sql_trans *tr, sql_chang { sql_idx *i = (sql_idx*)change->obj; - if (!isTempTable(i->t) && !tr->parent) { /* don't write save point commits */ + if (!isDeleted(i->t) && !isTempTable(i->t) && !tr->parent) { /* don't write save point commits */ storage *s = ATOMIC_PTR_GET(&i->t->data); sql_delta *d = ATOMIC_PTR_GET(&i->data); return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id); @@ -4307,7 +4308,7 @@ log_update_del( sql_trans *tr, sql_chang { sql_table *t = (sql_table*)change->obj; - if (!isTempTable(t) && !tr->parent) /* don't write save point commits */ + if (!isDeleted(t) && !isTempTable(t) && !tr->parent) /* don't write save point commits */ return log_storage(tr, t, ATOMIC_PTR_GET(&t->data)); return LOG_OK; } diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -3687,7 +3687,7 @@ sql_trans_rollback(sql_trans *tr, bool c list_destroy(tr->changes); tr->changes = NULL; tr->logchanges = 0; - } else if (ATOMIC_GET(&store->nr_active) == 1) { /* just me cleanup */ + } else { if (!commit_lock) MT_lock_set(&store->commit); store_lock(store); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org