Changeset: d0bfeb7957ef for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d0bfeb7957ef
Modified Files:
        gdk/gdk_logger.c
        sql/storage/bat/bat_storage.c
Branch: unlock
Log Message:

appends can be done concurrently now.


diffs (truncated from 323 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1720,12 +1720,11 @@ logger_load(int debug, const char *fn, c
                                 fn, fn, lg->dir);
                        goto error;
                }
-               if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED) 
{
+               if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED) 
{ /* closes the file */
                        fp = NULL;
                        goto error;
                }
                readlogs = 1;
-               fclose(fp);
                fp = NULL;
 
                assert(!lg->inmemory);
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -981,14 +981,14 @@ destroy_delta(sql_delta *b)
 }
 
 static sql_delta *
-bind_col_data(sql_trans *tr, sql_column *c)
+bind_col_data(sql_trans *tr, sql_column *c, bool update)
 {
        sql_delta *obat = ATOMIC_PTR_GET(&c->data);
 
        if (isTempTable(c->t))
                obat = temp_col_timestamp_delta(tr, c);
 
-       if (obat->cs.ts == tr->tid)
+       if (obat->cs.ts == tr->tid || !update)
                return obat;
        if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(c->t))
                /* abort */
@@ -1017,7 +1017,7 @@ update_col_prepare(sql_trans *tr, sql_al
 {
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
 
-       if ((delta = bind_col_data(tr, c)) == NULL)
+       if ((delta = bind_col_data(tr, c, true)) == NULL)
                return NULL;
 
        assert(delta && delta->cs.ts == tr->tid);
@@ -1055,14 +1055,14 @@ update_col(sql_trans *tr, sql_column *c,
 }
 
 static sql_delta *
-bind_idx_data(sql_trans *tr, sql_idx *i)
+bind_idx_data(sql_trans *tr, sql_idx *i, bool update)
 {
        sql_delta *obat = ATOMIC_PTR_GET(&i->data);
 
        if (isTempTable(i->t))
                obat = temp_idx_timestamp_delta(tr, i);
 
-       if (obat->cs.ts == tr->tid)
+       if (obat->cs.ts == tr->tid || !update)
                return obat;
        if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && 
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(i->t))
                /* abort */
@@ -1091,7 +1091,7 @@ update_idx_prepare(sql_trans *tr, sql_al
 {
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
 
-       if ((delta = bind_idx_data(tr, i)) == NULL)
+       if ((delta = bind_idx_data(tr, i, true)) == NULL)
                return NULL;
 
        assert(delta && delta->cs.ts == tr->tid);
@@ -1227,10 +1227,11 @@ append_col_prepare(sql_trans *tr, sql_al
 {
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
 
-       if ((delta = bind_col_data(tr, c)) == NULL)
+       if ((delta = bind_col_data(tr, c, false)) == NULL)
                return NULL;
 
-       assert(delta && delta->cs.ts == tr->tid);
+       assert(delta && (!isTempTable(c->t) || delta->cs.ts == tr->tid));
+       if (isTempTable(c->t))
        if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) 
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
                trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, 
isLocalTemp(c->t)?NULL:&log_update_col);
        return make_cookie(sa, tr, delta, c->t, false);
@@ -1273,10 +1274,11 @@ append_idx_prepare(sql_trans *tr, sql_al
 {
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
 
-       if ((delta = bind_idx_data(tr, i)) == NULL)
+       if ((delta = bind_idx_data(tr, i, false)) == NULL)
                return NULL;
 
-       assert(delta && delta->cs.ts == tr->tid);
+       assert(delta && (!isTempTable(i->t) || delta->cs.ts == tr->tid));
+       if (isTempTable(i->t))
        if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) 
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
                trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, 
isLocalTemp(i->t)?NULL:&log_update_idx);
        return make_cookie(sa, tr, delta, i->t, false);
@@ -1313,8 +1315,6 @@ storage_delete_val(sql_trans *tr, sql_ta
        if ((!inTransaction(tr, t) && (!in_transaction || isTempTable(t)) && 
isGlobal(t)) || (!isNew(t) && isLocalTemp(t)))
                trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del, 
isLocalTemp(t)?NULL:&log_update_del);
        return LOG_OK;
-       //bat->ucnt++;
-       //return cs_update_val(&bat->cs, rid, &T, is_new);
 }
 
 /* 0/1 or -1 */
@@ -2110,24 +2110,13 @@ log_create_del(sql_trans *tr, sql_change
        int ok = LOG_OK;
        sql_table *t = (sql_table*)change->obj;
 
-       /*
-       //sql_column *fc = ft->columns.set->h->data;
-       if (log_batgroup(bat_logger, ft->bootstrap?0:LOG_TAB, ft->base.id, 
ft->cleared,
-                               log_get_nr_inserted(fc, &offset_inserted), 
offset_inserted,
-                               log_get_nr_deleted(ft, &offset_deleted), 
offset_deleted) != GDK_SUCCEED)
-               ok = LOG_ERR;
-               */
-       /* offset/end */
-
        assert(!isTempTable(t));
        ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
-       //      ok = tr_log_storage(tr, ft->data, s?s->segs->h:NULL, 
ft->cleared, ft->base.id);
        if (ok == LOG_OK) {
                for(node *n = t->columns.set->h; n && ok == LOG_OK; n = 
n->next) {
                        sql_column *c = n->data;
 
                        ok = log_create_col_(tr, c);
-                   //ok = tr_log_delta(tr, cc->data, s?s->segs->h:NULL, 
ft->cleared, cc->base.id);
                }
                if (t->idxs.set) {
                        for(node *n = t->idxs.set->h; n && ok == LOG_OK; n = 
n->next) {
@@ -2135,16 +2124,9 @@ log_create_del(sql_trans *tr, sql_change
 
                                if (ATOMIC_PTR_GET(&i->data))
                                        ok = log_create_idx_(tr, i);
-                       //ok = tr_log_delta(tr, ci->data, s?s->segs->h:NULL, 
ft->cleared, ci->base.id);
                        }
                }
        }
-       /*
-       if (s)
-               for (segment *segs = s->segs->h; segs; segs=segs->next)
-                       if (segs->ts == tr->tid)
-                               segs->ts = 0;
-                               */
        return ok;
 }
 
@@ -2355,7 +2337,7 @@ clear_col(sql_trans *tr, sql_column *c)
 {
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
 
-       if ((delta = bind_col_data(tr, c)) == NULL)
+       if ((delta = bind_col_data(tr, c, false)) == NULL)
                return BUN_NONE;
        if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) 
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
                trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, 
isLocalTemp(c->t)?NULL:&log_update_col);
@@ -2371,7 +2353,7 @@ clear_idx(sql_trans *tr, sql_idx *i)
 
        if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) 
<= 1) || !idx_has_column(i->type))
                return 0;
-       if ((delta = bind_idx_data(tr, i)) == NULL)
+       if ((delta = bind_idx_data(tr, i, false)) == NULL)
                return BUN_NONE;
        if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) 
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
                trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, 
isLocalTemp(i->t)?NULL:&log_update_idx);
@@ -2447,7 +2429,7 @@ clear_table(sql_trans *tr, sql_table *t)
 }
 
 static gdk_return
-tr_log_cs( sql_trans *tr, column_storage *cs, segment *segs, sqlid id)
+tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, 
sqlid id)
 {
        sqlstore *store = tr->store;
        gdk_return ok = GDK_SUCCEED;
@@ -2473,6 +2455,7 @@ tr_log_cs( sql_trans *tr, column_storage
                return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
        }
 
+       if (isTempTable(t))
        for (; segs; segs=segs->next) {
                if (segs->ts == tr->tid) {
                        BAT *ins = temp_descriptor(cs->bid);
@@ -2497,19 +2480,59 @@ tr_log_cs( sql_trans *tr, column_storage
 }
 
 static int
-tr_log_delta( sql_trans *tr, sql_delta *cbat, segment *segs, sqlid id)
+tr_log_delta( sql_trans *tr, sql_table *t, sql_delta *cbat, segment *segs, 
sqlid id)
 {
-       return tr_log_cs( tr, &cbat->cs, segs, id);
+       return tr_log_cs( tr, t, &cbat->cs, segs, id);
 }
 
 static int
-tr_log_storage(sql_trans *tr, storage *s, sqlid id)
+log_table_append(sql_trans *tr, sql_table *t, segments *segs)
+{
+       sqlstore *store = tr->store;
+       gdk_return ok = GDK_SUCCEED;
+
+       if (isTempTable(t))
+               return LOG_OK;
+       for (segment *cur = segs->h; cur && ok; cur = cur->next) {
+               if (cur->ts == tr->tid && !cur->deleted) {
+                       for (node *n = t->columns.set->h; n && ok; n = n->next) 
{
+                               sql_column *c = n->data;
+                               column_storage *cs = ATOMIC_PTR_GET(&c->data);
+
+                               /* append col*/
+                               BAT *ins = temp_descriptor(cs->bid);
+                               assert(ins);
+                               ok = log_bat(store->logger, ins, c->base.id, 
cur->start, cur->end-cur->start);
+                               bat_destroy(ins);
+                       }
+                       if (t->idxs.set) {
+                               for (node *n = t->idxs.set->h; n && ok; n = 
n->next) {
+                                       sql_idx *i = n->data;
+                                       column_storage *cs = 
ATOMIC_PTR_GET(&i->data);
+
+                                       if (cs) {
+                                               /* append idx */
+                                               BAT *ins = 
temp_descriptor(cs->bid);
+                                               assert(ins);
+                                               ok = log_bat(store->logger, 
ins, i->base.id, cur->start, cur->end-cur->start);
+                                               bat_destroy(ins);
+                                       }
+                               }
+                       }
+               }
+       }
+       return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
+}
+
+static int
+log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
 {
        int ok = segments2cs(tr, s->segs, &s->cs);
        if (ok == LOG_OK)
+               ok = log_table_append(tr, t, s->segs);
+       if (ok == LOG_OK)
                return log_segments(tr, s->segs, id);
        return ok;
-       //return tr_log_cs( tr, &s->cs, s->segs->h, id);
 }
 
 static int
@@ -2625,7 +2648,7 @@ log_update_col( sql_trans *tr, sql_chang
 
        if (!isTempTable(c->t) && !tr->parent) {/* don't write save point 
commits */
                storage *s = ATOMIC_PTR_GET(&c->t->data);
-               return tr_log_delta(tr, ATOMIC_PTR_GET(&c->data), s->segs->h, 
c->base.id);
+               return tr_log_delta(tr, c->t, ATOMIC_PTR_GET(&c->data), 
s->segs->h, c->base.id);
        }
        return LOG_OK;
 }
@@ -2704,7 +2727,7 @@ log_update_idx( sql_trans *tr, sql_chang
 
        if (!isTempTable(i->t) && !tr->parent) { /* don't write save point 
commits */
                storage *s = ATOMIC_PTR_GET(&i->t->data);
-               return tr_log_delta(tr, ATOMIC_PTR_GET(&i->data), s->segs->h, 
i->base.id);
+               return tr_log_delta(tr, i->t, ATOMIC_PTR_GET(&i->data), 
s->segs->h, i->base.id);
        }
        return LOG_OK;
 }
@@ -2801,7 +2824,7 @@ log_update_del( sql_trans *tr, sql_chang
        sql_table *t = (sql_table*)change->obj;
 
        if (!isTempTable(t) && !tr->parent) /* don't write save point commits */
-               return tr_log_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
+               return log_storage(tr, t, ATOMIC_PTR_GET(&t->data), t->base.id);
        return LOG_OK;
 }
 
@@ -2843,27 +2866,9 @@ commit_update_del( sql_trans *tr, sql_ch
                t->base.flags = 0;
                return ok;
        }
-       /*
-       if (!isTempTable(t))
-               dbat->cs.ts = commit_ts;
-       */
        if (!commit_ts) { /* rollback */
                segment *s = rollback_segments(dbat->segs, tr, oldest);
                (void)s;
-               /*
-               storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
-
-               if (o != d) {
-                       while(o && o->next != d)
-                               o = o->next;
-               }
-               if (o == ATOMIC_PTR_GET(&t->data))
-                       ATOMIC_PTR_SET(&t->data, d->next);
-               else
-                       o->next = d->next;
-               d->next = NULL;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to