Changeset: 2b04f77c1610 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/2b04f77c1610 Modified Files: gdk/gdk_logger.c Branch: Jul2021 Log Message:
improved robustness of bm_subcommit. We now only cleanup break of the cleanup (ie compaction of the catalog bats), but don't fail the subcommit. Cleaned bats are marked as cleaned. diffs (278 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -1437,6 +1437,116 @@ subcommit_list_add(int next, bat *n, BUN return next; } +static int +cleanup_and_swap(logger *lg, const log_bid *bids, lng *lids, lng *cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, int cleanup) +{ + BAT *nbids, *noids, *ncnts, *nlids, *ndels; + BUN p, q; + int err = 0; + + oid *poss = Tloc(dcatalog, 0); + BATloop(dcatalog, p, q) { + oid pos = poss[p]; + + if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid) + continue; + + if (lg->debug & 1) { + fprintf(stderr, "release %d\n", bids[pos]); + if (BBP_lrefs(bids[pos]) != 2) + fprintf(stderr, "release %d %d\n", bids[pos], BBP_lrefs(bids[pos])); + } + if (lids[pos] >= 0) { + BAT *lb; + + BBPrelease(bids[pos]); + if ((lb = BATdescriptor(bids[pos])) == NULL || + BATmode(lb, true/*transient*/) != GDK_SUCCEED) { + TRC_WARNING(GDK, "Failed to set bat(%d) transient\n", bids[pos]); + err++; + } else { + lids[pos] = -1; /* mark freed */ + } + //assert(BBP_lrefs(bid) == lb->batSharecnt + 1 && BBP_refs(bid) <= lb->batSharecnt); + logbat_destroy(lb); + } + } + if (err) + return 0; + /* only project out the deleted with last id > lg->saved_tid + * update dcatalog, ie only keep those deleted which + * were not released ie last id <= lg->saved_tid */ + + BUN ocnt = BATcount(catalog_bid); + nbids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT); + noids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT); + ncnts = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT); + nlids = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT); + ndels = logbat_new(TYPE_oid, BATcount(dcatalog)-cleanup, PERSISTENT); + + if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) { + logbat_destroy(nbids); + logbat_destroy(noids); + logbat_destroy(ncnts); + logbat_destroy(nlids); + logbat_destroy(ndels); + return 0; + } + + int *oids = (int*)Tloc(catalog_id, 0); + q = BUNlast(catalog_bid); + for(p = 0; p<q && !err; p++) { + bat col = bids[p]; + int nid = oids[p]; + lng lid = lids[p]; + lng cnt = cnts[p]; + oid pos = p; + + if (lid != lng_nil && lid <= lg->saved_tid) + continue; /* remove */ + + if (BUNappend(nbids, &col, false) != GDK_SUCCEED || + BUNappend(noids, &nid, false) != GDK_SUCCEED || + BUNappend(nlids, &lid, false) != GDK_SUCCEED || + BUNappend(ncnts, &cnt, false) != GDK_SUCCEED) + err=1; + pos = (oid)(BATcount(nbids)-1); + if (lid != lng_nil && BUNappend(ndels, &pos, false) != GDK_SUCCEED) + err=1; + } + + if (err) + return 0; + /* point of no return */ + if (logger_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED || + logger_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED || + logger_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) { + logbat_destroy(nbids); + logbat_destroy(noids); + logbat_destroy(ndels); + logbat_destroy(ncnts); + logbat_destroy(nlids); + return -1; + } + logbat_destroy(lg->catalog_bid); + logbat_destroy(lg->catalog_id); + logbat_destroy(lg->dcatalog); + + lg->catalog_bid = nbids; + lg->catalog_id = noids; + lg->dcatalog = ndels; + + BBPunfix(lg->catalog_cnt->batCacheid); + BBPunfix(lg->catalog_lid->batCacheid); + + lg->catalog_cnt = ncnts; + lg->catalog_lid = nlids; + lg->cnt = BATcount(lg->catalog_bid); + lg->deleted -= cleanup; + assert(lg->deleted == BATcount(lg->dcatalog)); + return 0; +} + static gdk_return bm_subcommit(logger *lg) { @@ -1451,7 +1561,7 @@ bm_subcommit(logger *lg) int i = 0; gdk_return res; const log_bid *bids; - const lng *cnts = NULL, *lids = NULL; + lng *cnts = NULL, *lids = NULL; int cleanup = 0; lng t0 = 0; @@ -1466,9 +1576,9 @@ bm_subcommit(logger *lg) n[i++] = 0; /* n[0] is not used */ bids = (const log_bid *) Tloc(catalog_bid, 0); if (lg->catalog_cnt) - cnts = (const lng *) Tloc(lg->catalog_cnt, 0); + cnts = (lng *) Tloc(lg->catalog_cnt, 0); if (lg->catalog_lid) - lids = (const lng *) Tloc(lg->catalog_lid, 0); + lids = (lng *) Tloc(lg->catalog_lid, 0); BATloop(catalog_bid, p, q) { bat col = bids[p]; @@ -1488,114 +1598,16 @@ bm_subcommit(logger *lg) sizes[i] = BATcount(dcatalog); n[i++] = dcatalog->batCacheid; - if (cleanup) { - BAT *nbids, *noids, *ncnts, *nlids, *ndels; - - oid *poss = Tloc(dcatalog, 0); - BATloop(dcatalog, p, q) { - oid pos = poss[p]; - BAT *lb; - - if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid) - continue; - - if (lg->debug & 1) { - fprintf(stderr, "release %d\n", bids[pos]); - if (BBP_lrefs(bids[pos]) != 2) - fprintf(stderr, "release %d %d\n", bids[pos], BBP_lrefs(bids[pos])); - } - BBPrelease(bids[pos]); - if ((lb = BATdescriptor(bids[pos])) == NULL || - BATmode(lb, true/*transient*/) != GDK_SUCCEED) { - logbat_destroy(lb); - GDKfree(n); - GDKfree(sizes); - logger_unlock(lg); - return GDK_FAIL; - } - //assert(BBP_lrefs(bid) == lb->batSharecnt + 1 && BBP_refs(bid) <= lb->batSharecnt); - logbat_destroy(lb); - } - /* only project out the deleted with last id > lg->saved_tid - * update dcatalog, ie only keep those deleted which - * were not released ie last id <= lg->saved_tid */ - - BUN ocnt = BATcount(catalog_bid); - nbids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT); - noids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT); - ncnts = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT); - nlids = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT); - ndels = logbat_new(TYPE_oid, BATcount(dcatalog)-cleanup, PERSISTENT); - - if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == NULL || ndels == NULL) { - logbat_destroy(nbids); - logbat_destroy(noids); - logbat_destroy(ncnts); - logbat_destroy(nlids); - logbat_destroy(ndels); - GDKfree(n); - GDKfree(sizes); - logger_unlock(lg); - return GDK_FAIL; - } - - int *oids = (int*)Tloc(catalog_id, 0); - q = BUNlast(catalog_bid); - int err = 0; - for(p = 0; p<q && !err; p++) { - bat col = bids[p]; - int nid = oids[p]; - lng lid = lids[p]; - lng cnt = cnts[p]; - oid pos = p; - - if (lid != lng_nil && lid <= lg->saved_tid) - continue; /* remove */ - - if (BUNappend(nbids, &col, false) != GDK_SUCCEED || - BUNappend(noids, &nid, false) != GDK_SUCCEED || - BUNappend(nlids, &lid, false) != GDK_SUCCEED || - BUNappend(ncnts, &cnt, false) != GDK_SUCCEED) - err=1; - pos = (oid)(BATcount(nbids)-1); - if (lid != lng_nil && BUNappend(ndels, &pos, false) != GDK_SUCCEED) - err=1; - } - - if (err || - logger_switch_bat(catalog_bid, nbids, lg->fn, "catalog_bid") != GDK_SUCCEED || - logger_switch_bat(catalog_id, noids, lg->fn, "catalog_id") != GDK_SUCCEED || - logger_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != GDK_SUCCEED) { - logbat_destroy(nbids); - logbat_destroy(noids); - logbat_destroy(ndels); - logbat_destroy(ncnts); - logbat_destroy(nlids); - GDKfree(n); - GDKfree(sizes); - logger_unlock(lg); - return GDK_FAIL; - } - i = subcommit_list_add(i, n, sizes, nbids->batCacheid, BATcount(nbids)); - i = subcommit_list_add(i, n, sizes, noids->batCacheid, BATcount(nbids)); - i = subcommit_list_add(i, n, sizes, ndels->batCacheid, BATcount(ndels)); - - logbat_destroy(lg->catalog_bid); - logbat_destroy(lg->catalog_id); - logbat_destroy(lg->dcatalog); - - lg->catalog_bid = catalog_bid = nbids; - lg->catalog_id = catalog_id = noids; - lg->dcatalog = dcatalog = ndels; - - BBPunfix(lg->catalog_cnt->batCacheid); - BBPunfix(lg->catalog_lid->batCacheid); - - lg->catalog_cnt = ncnts; - lg->catalog_lid = nlids; - lg->cnt = BATcount(lg->catalog_bid); - lg->deleted -= cleanup; - assert(lg->deleted == BATcount(lg->dcatalog)); + if (cleanup && cleanup_and_swap(lg, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup)) { + GDKfree(n); + GDKfree(sizes); + logger_unlock(lg); + return GDK_FAIL; + } + if (dcatalog != lg->dcatalog) { + i = subcommit_list_add(i, n, sizes, lg->catalog_bid->batCacheid, BATcount(lg->catalog_bid)); + i = subcommit_list_add(i, n, sizes, lg->catalog_id->batCacheid, BATcount(lg->catalog_bid)); + i = subcommit_list_add(i, n, sizes, lg->dcatalog->batCacheid, BATcount(lg->dcatalog)); } if (lg->seqs_id) { sizes[i] = BATcount(lg->seqs_id); @@ -1663,11 +1675,6 @@ bm_subcommit(logger *lg) } assert((BUN) i <= nn); - /* - BATcommit(catalog_bid, BUN_NONE); - BATcommit(catalog_id, BUN_NONE); - BATcommit(dcatalog, BUN_NONE); - */ logger_unlock(lg); if (lg->debug & 1) t0 = GDKusec(); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list