Changeset: 1d07c616784e for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/1d07c616784e
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
        sql/include/sql_catalog.h
        sql/storage/bat/bat_storage.c
        sql/storage/store.c
Branch: Sep2022
Log Message:

merged with jan2022


diffs (224 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -666,8 +666,10 @@ la_bat_updates(logger *lg, logaction *la
 
        if (bid < 0)
                return GDK_FAIL;
-       if (bid == 0)
-               return GDK_SUCCEED; /* ignore bats no longer in the catalog */
+       if (!bid) {
+               GDKerror("la_bat_updates failed to find bid for object %d\n", 
la->cid);
+               return GDK_FAIL;
+       }
 
        if (!lg->flushing) {
                b = BATdescriptor(bid);
@@ -760,6 +762,10 @@ la_bat_destroy(logger *lg, logaction *la
 
        if (bid < 0)
                return GDK_FAIL;
+       if (!bid) {
+               GDKerror("la_bat_destroy failed to find bid for object %d\n", 
la->cid);
+               return GDK_FAIL;
+       }
        if (bid && log_del_bat(lg, bid) != GDK_SUCCEED)
                return GDK_FAIL;
        return GDK_SUCCEED;
@@ -1041,6 +1047,7 @@ log_open_output(logger *lg)
        }
 
        lg->end = 0;
+       lg->drops = 0;
        if (!LOG_DISABLED(lg)) {
                char id[32];
                char *filename;
@@ -1669,8 +1676,6 @@ bm_subcommit(logger *lg)
        sizes[i] = BATcount(dcatalog);
        n[i++] = dcatalog->batCacheid;
 
-       if (cleanup < (lg->cnt/2))
-               cleanup = 0;
        if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, 
catalog_bid, catalog_id, dcatalog, cleanup)) < 0) {
                GDKfree(n);
                GDKfree(r);
@@ -1689,7 +1694,7 @@ bm_subcommit(logger *lg)
                sizes[i] = BATcount(lg->seqs_id);
                n[i++] = lg->seqs_val->batCacheid;
        }
-       if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > 
(BATcount(lg->seqs_id)/2)) {
+       if (!cleanup && lg->seqs_id && BATcount(lg->dseqs) > 
(BATcount(lg->seqs_id)/2) && BATcount(lg->dseqs) > 10 ) {
                BAT *tids, *ids, *vals;
 
                tids = bm_tids(lg->seqs_id, lg->dseqs);
@@ -2151,6 +2156,8 @@ log_new(int debug, const char *fn, const
                .funcdata = funcdata,
 
                .id = 0,
+               .drops = 0,
+               .end = 0,
                .saved_id = getBBPlogno(),              /* get saved log numer 
from bbp */
                .saved_tid = (int)getBBPtransid(),      /* get saved 
transaction id from bbp */
        };
@@ -2304,21 +2311,18 @@ log_cleanup_range(logger *lg, ulng id)
 gdk_return
 log_activate(logger *lg)
 {
+       gdk_return res = GDK_SUCCEED;
        MT_lock_set(&lg->rotation_lock);
        log_lock(lg);
-       if (lg->end > 0 && lg->saved_id+1 == lg->id) {
+       if (lg->drops > 100000 && lg->end > 0 && lg->saved_id+1 == lg->id) {
                lg->id++;
                log_close_output(lg);
                /* start new file */
-               if (log_open_output(lg) != GDK_SUCCEED) {
-                       log_unlock(lg);
-                       MT_lock_unset(&lg->rotation_lock);
-                       return GDK_FAIL;
-               }
+               res = log_open_output(lg);
        }
        log_unlock(lg);
        MT_lock_unset(&lg->rotation_lock);
-       return GDK_SUCCEED;
+       return res;
 }
 
 gdk_return
@@ -2659,6 +2663,11 @@ log_bat_transient(logger *lg, log_id id)
                log_unlock(lg);
                return GDK_FAIL;
        }
+       if (!bid) {
+               GDKerror("log_bat_transient failed to find bid for object 
%d\n", id);
+               log_unlock(lg);
+               return GDK_FAIL;
+       }
        l.flag = LOG_DESTROY;
        l.id = id;
 
@@ -2676,7 +2685,9 @@ log_bat_transient(logger *lg, log_id id)
                                bid);
        BAT *b = BBPquickdesc(bid);
        assert(b);
-       lg->end += BATcount(b);
+       BUN cnt = BATcount(b);
+       lg->end += cnt;
+       lg->drops += cnt;
        gdk_return r = log_del_bat(lg, bid);
        log_unlock(lg);
        if (r != GDK_SUCCEED)
@@ -2828,7 +2839,7 @@ new_logfile(logger *lg, stream* output_l
        const lng p = (lng) getfilepos(getFile(lg->output_log));
        if (p == -1)
                return GDK_FAIL;
-       if (( p > log_large || (lg->end*1024) > log_large )) {
+       if (lg->drops > 100000 || p > log_large || (lg->end*1024) > log_large) {
                log_lock(lg);
                if (ATOMIC_GET(&lg->refcount) == 1) {
                        lg->id++;
@@ -2927,6 +2938,10 @@ log_tdone(logger *lg, ulng commit_ts)
        if (lg->current) {
                lg->current->last_ts = commit_ts;
        }
+       stream* output_log = lg->output_log;
+       ulng id = lg->id;
+       if (!LOG_DISABLED(lg) && new_logfile(lg, output_log, id) != GDK_SUCCEED)
+               GDKfatal("Could not create new log file\n");
 }
 
 gdk_return
@@ -2962,8 +2977,7 @@ log_tflush(logger* lg, ulng log_file_id,
                        id = lg->id;
                        MT_lock_unset(&lg->rotation_lock);
                        if (mnstr_flush(output_log, MNSTR_FLUSH_DATA) ||
-                                       (!(GDKdebug & NOSYNCMASK) && 
mnstr_fsync(output_log)) ||
-                                       new_logfile(lg, output_log, id) != 
GDK_SUCCEED) {
+                               (!(GDKdebug & NOSYNCMASK) && 
mnstr_fsync(output_log))) {
                                /* flush failed */
                                MT_lock_set(&lg->rotation_lock);
                                lg->flushing_output_log = false;
@@ -3155,6 +3169,10 @@ log_find_bat(logger *lg, log_id id)
        log_lock(lg);
        log_bid bid = internal_find_bat(lg, id, -1);
        log_unlock(lg);
+       if (!bid) {
+               GDKerror("logger_find_bat failed to find bid for object %d\n", 
id);
+               return GDK_FAIL;
+       }
        return bid;
 }
 
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -30,6 +30,7 @@ struct logger {
        int saved_tid;          /* id of transaction which was flushed out 
(into BBP storage)  */
        bool flushing;
        bool flushnow;
+       ulng drops;
        bool request_rotation;
        logged_range *pending;
        logged_range *current;
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -204,6 +204,7 @@ typedef struct sql_base {
 } sql_base;
 
 #define isNew(x)          ((x)->base.new)
+#define isDeleted(x)      ((x)->base.deleted)
 
 extern void base_init(sql_allocator *sa, sql_base * b, sqlid id, bool isnew, 
const char *name);
 
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -286,6 +286,7 @@ segments2cs(sql_trans *tr, segments *seg
        MT_lock_unset(&b->theaplock);
 
        uint32_t *restrict dst;
+       /* why hashlock ?? */
        MT_rwlock_wrlock(&b->thashlock);
        for (; s ; s=s->next) {
                if (s->start >= nr)
@@ -4150,7 +4151,7 @@ log_update_col( sql_trans *tr, sql_chang
 {
        sql_column *c = (sql_column*)change->obj;
 
-       if (!isTempTable(c->t) && !tr->parent) {/* don't write save point 
commits */
+       if (!isDeleted(c->t) && !isTempTable(c->t) && !tr->parent) {/* don't 
write save point commits */
                storage *s = ATOMIC_PTR_GET(&c->t->data);
                sql_delta *d = ATOMIC_PTR_GET(&c->data);
                return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
@@ -4263,7 +4264,7 @@ log_update_idx( sql_trans *tr, sql_chang
 {
        sql_idx *i = (sql_idx*)change->obj;
 
-       if (!isTempTable(i->t) && !tr->parent) { /* don't write save point 
commits */
+       if (!isDeleted(i->t) && !isTempTable(i->t) && !tr->parent) { /* don't 
write save point commits */
                storage *s = ATOMIC_PTR_GET(&i->t->data);
                sql_delta *d = ATOMIC_PTR_GET(&i->data);
                return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
@@ -4307,7 +4308,7 @@ log_update_del( sql_trans *tr, sql_chang
 {
        sql_table *t = (sql_table*)change->obj;
 
-       if (!isTempTable(t) && !tr->parent) /* don't write save point commits */
+       if (!isDeleted(t) && !isTempTable(t) && !tr->parent) /* don't write 
save point commits */
                return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
        return LOG_OK;
 }
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3687,7 +3687,7 @@ sql_trans_rollback(sql_trans *tr, bool c
                list_destroy(tr->changes);
                tr->changes = NULL;
                tr->logchanges = 0;
-       } else if (ATOMIC_GET(&store->nr_active) == 1) { /* just me cleanup */
+       } else {
                if (!commit_lock)
                        MT_lock_set(&store->commit);
                store_lock(store);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to