Changeset: d0bfeb7957ef for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d0bfeb7957ef
Modified Files:
gdk/gdk_logger.c
sql/storage/bat/bat_storage.c
Branch: unlock
Log Message:
appends can be done concurrently now.
diffs (truncated from 323 to 300 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1720,12 +1720,11 @@ logger_load(int debug, const char *fn, c
fn, fn, lg->dir);
goto error;
}
- if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED)
{
+ if (check_version(lg, fp, fn, logdir, filename) != GDK_SUCCEED)
{ /* closes the file */
fp = NULL;
goto error;
}
readlogs = 1;
- fclose(fp);
fp = NULL;
assert(!lg->inmemory);
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -981,14 +981,14 @@ destroy_delta(sql_delta *b)
}
static sql_delta *
-bind_col_data(sql_trans *tr, sql_column *c)
+bind_col_data(sql_trans *tr, sql_column *c, bool update)
{
sql_delta *obat = ATOMIC_PTR_GET(&c->data);
if (isTempTable(c->t))
obat = temp_col_timestamp_delta(tr, c);
- if (obat->cs.ts == tr->tid)
+ if (obat->cs.ts == tr->tid || !update)
return obat;
if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) &&
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(c->t))
/* abort */
@@ -1017,7 +1017,7 @@ update_col_prepare(sql_trans *tr, sql_al
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
- if ((delta = bind_col_data(tr, c)) == NULL)
+ if ((delta = bind_col_data(tr, c, true)) == NULL)
return NULL;
assert(delta && delta->cs.ts == tr->tid);
@@ -1055,14 +1055,14 @@ update_col(sql_trans *tr, sql_column *c,
}
static sql_delta *
-bind_idx_data(sql_trans *tr, sql_idx *i)
+bind_idx_data(sql_trans *tr, sql_idx *i, bool update)
{
sql_delta *obat = ATOMIC_PTR_GET(&i->data);
if (isTempTable(i->t))
obat = temp_idx_timestamp_delta(tr, i);
- if (obat->cs.ts == tr->tid)
+ if (obat->cs.ts == tr->tid || !update)
return obat;
if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) &&
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(i->t))
/* abort */
@@ -1091,7 +1091,7 @@ update_idx_prepare(sql_trans *tr, sql_al
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
- if ((delta = bind_idx_data(tr, i)) == NULL)
+ if ((delta = bind_idx_data(tr, i, true)) == NULL)
return NULL;
assert(delta && delta->cs.ts == tr->tid);
@@ -1227,10 +1227,11 @@ append_col_prepare(sql_trans *tr, sql_al
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
- if ((delta = bind_col_data(tr, c)) == NULL)
+ if ((delta = bind_col_data(tr, c, false)) == NULL)
return NULL;
- assert(delta && delta->cs.ts == tr->tid);
+ assert(delta && (!isTempTable(c->t) || delta->cs.ts == tr->tid));
+ if (isTempTable(c->t))
if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t))
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col,
isLocalTemp(c->t)?NULL:&log_update_col);
return make_cookie(sa, tr, delta, c->t, false);
@@ -1273,10 +1274,11 @@ append_idx_prepare(sql_trans *tr, sql_al
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
- if ((delta = bind_idx_data(tr, i)) == NULL)
+ if ((delta = bind_idx_data(tr, i, false)) == NULL)
return NULL;
- assert(delta && delta->cs.ts == tr->tid);
+ assert(delta && (!isTempTable(i->t) || delta->cs.ts == tr->tid));
+ if (isTempTable(i->t))
if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t))
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx,
isLocalTemp(i->t)?NULL:&log_update_idx);
return make_cookie(sa, tr, delta, i->t, false);
@@ -1313,8 +1315,6 @@ storage_delete_val(sql_trans *tr, sql_ta
if ((!inTransaction(tr, t) && (!in_transaction || isTempTable(t)) &&
isGlobal(t)) || (!isNew(t) && isLocalTemp(t)))
trans_add(tr, &t->base, s, &tc_gc_del, &commit_update_del,
isLocalTemp(t)?NULL:&log_update_del);
return LOG_OK;
- //bat->ucnt++;
- //return cs_update_val(&bat->cs, rid, &T, is_new);
}
/* 0/1 or -1 */
@@ -2110,24 +2110,13 @@ log_create_del(sql_trans *tr, sql_change
int ok = LOG_OK;
sql_table *t = (sql_table*)change->obj;
- /*
- //sql_column *fc = ft->columns.set->h->data;
- if (log_batgroup(bat_logger, ft->bootstrap?0:LOG_TAB, ft->base.id,
ft->cleared,
- log_get_nr_inserted(fc, &offset_inserted),
offset_inserted,
- log_get_nr_deleted(ft, &offset_deleted),
offset_deleted) != GDK_SUCCEED)
- ok = LOG_ERR;
- */
- /* offset/end */
-
assert(!isTempTable(t));
ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
- // ok = tr_log_storage(tr, ft->data, s?s->segs->h:NULL,
ft->cleared, ft->base.id);
if (ok == LOG_OK) {
for(node *n = t->columns.set->h; n && ok == LOG_OK; n =
n->next) {
sql_column *c = n->data;
ok = log_create_col_(tr, c);
- //ok = tr_log_delta(tr, cc->data, s?s->segs->h:NULL,
ft->cleared, cc->base.id);
}
if (t->idxs.set) {
for(node *n = t->idxs.set->h; n && ok == LOG_OK; n =
n->next) {
@@ -2135,16 +2124,9 @@ log_create_del(sql_trans *tr, sql_change
if (ATOMIC_PTR_GET(&i->data))
ok = log_create_idx_(tr, i);
- //ok = tr_log_delta(tr, ci->data, s?s->segs->h:NULL,
ft->cleared, ci->base.id);
}
}
}
- /*
- if (s)
- for (segment *segs = s->segs->h; segs; segs=segs->next)
- if (segs->ts == tr->tid)
- segs->ts = 0;
- */
return ok;
}
@@ -2355,7 +2337,7 @@ clear_col(sql_trans *tr, sql_column *c)
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
- if ((delta = bind_col_data(tr, c)) == NULL)
+ if ((delta = bind_col_data(tr, c, false)) == NULL)
return BUN_NONE;
if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t))
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col,
isLocalTemp(c->t)?NULL:&log_update_col);
@@ -2371,7 +2353,7 @@ clear_idx(sql_trans *tr, sql_idx *i)
if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns)
<= 1) || !idx_has_column(i->type))
return 0;
- if ((delta = bind_idx_data(tr, i)) == NULL)
+ if ((delta = bind_idx_data(tr, i, false)) == NULL)
return BUN_NONE;
if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t))
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx,
isLocalTemp(i->t)?NULL:&log_update_idx);
@@ -2447,7 +2429,7 @@ clear_table(sql_trans *tr, sql_table *t)
}
static gdk_return
-tr_log_cs( sql_trans *tr, column_storage *cs, segment *segs, sqlid id)
+tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs,
sqlid id)
{
sqlstore *store = tr->store;
gdk_return ok = GDK_SUCCEED;
@@ -2473,6 +2455,7 @@ tr_log_cs( sql_trans *tr, column_storage
return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
}
+ if (isTempTable(t))
for (; segs; segs=segs->next) {
if (segs->ts == tr->tid) {
BAT *ins = temp_descriptor(cs->bid);
@@ -2497,19 +2480,59 @@ tr_log_cs( sql_trans *tr, column_storage
}
static int
-tr_log_delta( sql_trans *tr, sql_delta *cbat, segment *segs, sqlid id)
+tr_log_delta( sql_trans *tr, sql_table *t, sql_delta *cbat, segment *segs,
sqlid id)
{
- return tr_log_cs( tr, &cbat->cs, segs, id);
+ return tr_log_cs( tr, t, &cbat->cs, segs, id);
}
static int
-tr_log_storage(sql_trans *tr, storage *s, sqlid id)
+log_table_append(sql_trans *tr, sql_table *t, segments *segs)
+{
+ sqlstore *store = tr->store;
+ gdk_return ok = GDK_SUCCEED;
+
+ if (isTempTable(t))
+ return LOG_OK;
+ for (segment *cur = segs->h; cur && ok; cur = cur->next) {
+ if (cur->ts == tr->tid && !cur->deleted) {
+ for (node *n = t->columns.set->h; n && ok; n = n->next)
{
+ sql_column *c = n->data;
+ column_storage *cs = ATOMIC_PTR_GET(&c->data);
+
+ /* append col*/
+ BAT *ins = temp_descriptor(cs->bid);
+ assert(ins);
+ ok = log_bat(store->logger, ins, c->base.id,
cur->start, cur->end-cur->start);
+ bat_destroy(ins);
+ }
+ if (t->idxs.set) {
+ for (node *n = t->idxs.set->h; n && ok; n =
n->next) {
+ sql_idx *i = n->data;
+ column_storage *cs =
ATOMIC_PTR_GET(&i->data);
+
+ if (cs) {
+ /* append idx */
+ BAT *ins =
temp_descriptor(cs->bid);
+ assert(ins);
+ ok = log_bat(store->logger,
ins, i->base.id, cur->start, cur->end-cur->start);
+ bat_destroy(ins);
+ }
+ }
+ }
+ }
+ }
+ return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
+}
+
+static int
+log_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
{
int ok = segments2cs(tr, s->segs, &s->cs);
if (ok == LOG_OK)
+ ok = log_table_append(tr, t, s->segs);
+ if (ok == LOG_OK)
return log_segments(tr, s->segs, id);
return ok;
- //return tr_log_cs( tr, &s->cs, s->segs->h, id);
}
static int
@@ -2625,7 +2648,7 @@ log_update_col( sql_trans *tr, sql_chang
if (!isTempTable(c->t) && !tr->parent) {/* don't write save point
commits */
storage *s = ATOMIC_PTR_GET(&c->t->data);
- return tr_log_delta(tr, ATOMIC_PTR_GET(&c->data), s->segs->h,
c->base.id);
+ return tr_log_delta(tr, c->t, ATOMIC_PTR_GET(&c->data),
s->segs->h, c->base.id);
}
return LOG_OK;
}
@@ -2704,7 +2727,7 @@ log_update_idx( sql_trans *tr, sql_chang
if (!isTempTable(i->t) && !tr->parent) { /* don't write save point
commits */
storage *s = ATOMIC_PTR_GET(&i->t->data);
- return tr_log_delta(tr, ATOMIC_PTR_GET(&i->data), s->segs->h,
i->base.id);
+ return tr_log_delta(tr, i->t, ATOMIC_PTR_GET(&i->data),
s->segs->h, i->base.id);
}
return LOG_OK;
}
@@ -2801,7 +2824,7 @@ log_update_del( sql_trans *tr, sql_chang
sql_table *t = (sql_table*)change->obj;
if (!isTempTable(t) && !tr->parent) /* don't write save point commits */
- return tr_log_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
+ return log_storage(tr, t, ATOMIC_PTR_GET(&t->data), t->base.id);
return LOG_OK;
}
@@ -2843,27 +2866,9 @@ commit_update_del( sql_trans *tr, sql_ch
t->base.flags = 0;
return ok;
}
- /*
- if (!isTempTable(t))
- dbat->cs.ts = commit_ts;
- */
if (!commit_ts) { /* rollback */
segment *s = rollback_segments(dbat->segs, tr, oldest);
(void)s;
- /*
- storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
-
- if (o != d) {
- while(o && o->next != d)
- o = o->next;
- }
- if (o == ATOMIC_PTR_GET(&t->data))
- ATOMIC_PTR_SET(&t->data, d->next);
- else
- o->next = d->next;
- d->next = NULL;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list