Changeset: 2f44594a914e for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/2f44594a914e Modified Files: sql/server/sql_scan.c Branch: Jul2021 Log Message:
Merge heads diffs (truncated from 5186 to 300 lines): diff --git a/clients/Tests/MAL-signatures.stable.out b/clients/Tests/MAL-signatures.stable.out --- a/clients/Tests/MAL-signatures.stable.out +++ b/clients/Tests/MAL-signatures.stable.out @@ -9170,6 +9170,7 @@ stdout of test 'MAL-signatures` in direc [ "rapi", "eval_aggr", "pattern rapi.eval_aggr(X_0:ptr, X_1:str, X_2:any...):any... ", "RAPIevalAggr;", "" ] [ "rapi", "prelude", "command rapi.prelude():void ", "RAPIprelude;", "" ] [ "rapi", "subeval_aggr", "pattern rapi.subeval_aggr(X_0:ptr, X_1:str, X_2:any...):any... ", "RAPIevalAggr;", "" ] +[ "remote", "assert", "pattern remote.assert(X_0:bit, X_1:str):void ", "RMTassert;", "" ] [ "remote", "batbincopy", "pattern remote.batbincopy():bat[:any] ", "RMTbincopyfrom;", "" ] [ "remote", "batbincopy", "pattern remote.batbincopy(X_0:bat[:any]):void ", "RMTbincopyto;", "" ] [ "remote", "batload", "pattern remote.batload(X_0:any_1, X_1:int):bat[:any_1] ", "RMTbatload;", "" ] @@ -9271,6 +9272,7 @@ stdout of test 'MAL-signatures` in direc [ "sql", "deltas", "pattern sql.deltas(X_0:str, X_1:str) (X_2:bat[:int], X_3:bat[:lng], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], X_8:bat[:int]) ", "mvc_delta_values;", "" ] [ "sql", "deltas", "pattern sql.deltas(X_0:str, X_1:str, X_2:str) (X_3:bat[:int], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], X_8:bat[:lng], X_9:bat[:int]) ", "mvc_delta_values;", "" ] [ "sql", "dense_rank", "pattern sql.dense_rank(X_0:any_1, X_1:bit, X_2:bit):int ", "SQLdense_rank;", "" ] +[ "sql", "deregister", "pattern sql.deregister():int ", "RAstatementEnd;", "" ] [ "sql", "diff", "pattern sql.diff(X_0:any_1):bit ", "SQLdiff;", "" ] [ "sql", "diff", "pattern sql.diff(X_0:bit, X_1:any_1):bit ", "SQLdiff;", "" ] [ "sql", "drop_hash", "unsafe pattern sql.drop_hash(X_0:str, X_1:str):void ", "SQLdrop_hash;", "" ] diff --git a/clients/Tests/MAL-signatures.stable.out.int128 b/clients/Tests/MAL-signatures.stable.out.int128 --- a/clients/Tests/MAL-signatures.stable.out.int128 +++ b/clients/Tests/MAL-signatures.stable.out.int128 @@ -12471,6 +12471,7 @@ stdout of test 'MAL-signatures` in direc [ "rapi", "eval_aggr", "pattern rapi.eval_aggr(X_0:ptr, X_1:str, X_2:any...):any... ", "RAPIevalAggr;", "" ] [ "rapi", "prelude", "command rapi.prelude():void ", "RAPIprelude;", "" ] [ "rapi", "subeval_aggr", "pattern rapi.subeval_aggr(X_0:ptr, X_1:str, X_2:any...):any... ", "RAPIevalAggr;", "" ] +[ "remote", "assert", "pattern remote.assert(X_0:bit, X_1:str):void ", "RMTassert;", "" ] [ "remote", "batbincopy", "pattern remote.batbincopy():bat[:any] ", "RMTbincopyfrom;", "" ] [ "remote", "batbincopy", "pattern remote.batbincopy(X_0:bat[:any]):void ", "RMTbincopyto;", "" ] [ "remote", "batload", "pattern remote.batload(X_0:any_1, X_1:int):bat[:any_1] ", "RMTbatload;", "" ] @@ -12577,6 +12578,7 @@ stdout of test 'MAL-signatures` in direc [ "sql", "deltas", "pattern sql.deltas(X_0:str, X_1:str) (X_2:bat[:int], X_3:bat[:lng], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], X_8:bat[:int]) ", "mvc_delta_values;", "" ] [ "sql", "deltas", "pattern sql.deltas(X_0:str, X_1:str, X_2:str) (X_3:bat[:int], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], X_8:bat[:lng], X_9:bat[:int]) ", "mvc_delta_values;", "" ] [ "sql", "dense_rank", "pattern sql.dense_rank(X_0:any_1, X_1:bit, X_2:bit):int ", "SQLdense_rank;", "" ] +[ "sql", "deregister", "pattern sql.deregister():int ", "RAstatementEnd;", "" ] [ "sql", "diff", "pattern sql.diff(X_0:any_1):bit ", "SQLdiff;", "" ] [ "sql", "diff", "pattern sql.diff(X_0:bit, X_1:any_1):bit ", "SQLdiff;", "" ] [ "sql", "drop_hash", "unsafe pattern sql.drop_hash(X_0:str, X_1:str):void ", "SQLdrop_hash;", "" ] diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -937,6 +937,7 @@ const char *deleteRef; void deleteSymbol(Module scope, Symbol prg); const char *deltaRef; const char *dense_rankRef; +const char *deregisterRef; malType destinationType(MalBlkPtr mb, InstrPtr p); const char *diffRef; const char *diffcandRef; diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c --- a/gdk/gdk_batop.c +++ b/gdk/gdk_batop.c @@ -1300,7 +1300,10 @@ BATappend_or_update(BAT *b, BAT *p, cons #endif } if (ATOMreplaceVAR(b, &d, new) != GDK_SUCCEED) { + Hash *h = b->thash; + b->thash = NULL; MT_rwlock_wrunlock(&b->thashlock); + doHASHdestroy(b, h); bat_iterator_end(&ni); return GDK_FAIL; } @@ -1308,7 +1311,10 @@ BATappend_or_update(BAT *b, BAT *p, cons (b->twidth <= 2 ? d - GDK_VAROFFSET : d) >= ((size_t) 1 << (8 << b->tshift))) { /* doesn't fit in current heap, upgrade it */ if (GDKupgradevarheap(b, d, 0, false) != GDK_SUCCEED) { + Hash *h = b->thash; + b->thash = NULL; MT_rwlock_wrunlock(&b->thashlock); + doHASHdestroy(b, h); bat_iterator_end(&ni); return GDK_FAIL; } diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -1012,6 +1012,58 @@ movestrbats(void) } #endif +static void +BBPmanager(void *dummy) +{ + (void) dummy; + + for (;;) { + int n = 0; + for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) { + MT_lock_set(&GDKswapLock(bid)); + if (BBP_refs(bid) == 0 && BBP_lrefs(bid) != 0) { + BBP_status_off(bid, BBPHOT); + n++; + } + MT_lock_unset(&GDKswapLock(bid)); + } + TRC_DEBUG(BAT_, "cleared HOT bit from %d bats\n", n); + for (int i = 0; i < 100; i++) { + MT_sleep_ms(100); + if (GDKexiting()) + return; + } + n = 0; + for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) { + MT_lock_set(&GDKswapLock(bid)); + BAT *b = NULL; + bool swap = false; + if (BBP_refs(bid) == 0 && + BBP_lrefs(bid) != 0 && + (b = BBP_cache(bid)) != NULL && + b->batSharecnt == 0 && + !BATdirty(b) && + !(BBP_status(bid) & (BBPHOT | BBPUNLOADING)) && + (BBP_status(bid) & BBPPERSISTENT)) { + BBP_status_on(bid, BBPUNLOADING); + swap = true; + } + MT_lock_unset(&GDKswapLock(bid)); + if (swap) { + TRC_DEBUG(BAT_, "unload and free bat %d\n", bid); + if (BBPfree(b) != GDK_SUCCEED) + GDKerror("unload failed for bat %d", bid); + n++; + } + } + TRC_DEBUG(BAT_, "unloaded %d bats\n", n); + if (GDKexiting()) + return; + } +} + +static MT_Id manager; + gdk_return BBPinit(void) { @@ -1199,6 +1251,8 @@ BBPinit(void) return GDK_FAIL; } #endif + + manager = THRcreate(BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager"); return GDK_SUCCEED; bailout: @@ -2371,7 +2425,7 @@ decref(bat i, bool logical, bool release * while locked so no other thread thinks it's * available anymore */ assert((BBP_status(i) & BBPUNLOADING) == 0); - TRC_DEBUG(BAT_, "%s set to unloading BAT %d (status %u)\n", func, i, BBP_status(i)); + TRC_DEBUG(BAT_, "%s set to unloading BAT %d (status %u, lrefs %d)\n", func, i, BBP_status(i), BBP_lrefs(i)); BBP_status_on(i, BBPUNLOADING); swap = true; } diff --git a/gdk/gdk_hash.c b/gdk/gdk_hash.c --- a/gdk/gdk_hash.c +++ b/gdk/gdk_hash.c @@ -105,7 +105,7 @@ HASHclear(Hash *h) #define HASH_VERSION_NOUUID 3 #define HASH_HEADER_SIZE 7 /* nr of size_t fields in header */ -static void +void doHASHdestroy(BAT *b, Hash *hs) { if (hs == (Hash *) 1) { @@ -291,6 +291,57 @@ HASHupgradehashheap(BAT *b) return GDK_SUCCEED; } +/* write/remove the bit into/from the hash file that indicates the hash + * is good to go; the bit is the last part to be written and the first + * to be removed */ +static inline gdk_return +HASHfix(Hash *h, bool save, bool dosync) +{ + if (!h->heapbckt.dirty && !h->heaplink.dirty) { + const size_t mask = (size_t) 1 << 24; + if (((size_t *) h->heapbckt.base)[0] & mask) { + if (save) + return GDK_SUCCEED; + ((size_t *) h->heapbckt.base)[0] &= ~mask; + } else { + if (!save) + return GDK_SUCCEED; + ((size_t *) h->heapbckt.base)[0] |= mask; + } + if (h->heapbckt.storage == STORE_MEM) { + gdk_return rc = GDK_FAIL; + int fd = GDKfdlocate(h->heapbckt.farmid, h->heapbckt.filename, "rb+", NULL); + if (fd >= 0) { + if (write(fd, h->heapbckt.base, SIZEOF_SIZE_T) == SIZEOF_SIZE_T) { + if (dosync && + !(GDKdebug & NOSYNCMASK)) { +#if defined(NATIVE_WIN32) + _commit(fd); +#elif defined(HAVE_FDATASYNC) + fdatasync(fd); +#elif defined(HAVE_FSYNC) + fsync(fd); +#endif + } + rc = GDK_SUCCEED; + } + close(fd); + } + if (rc != GDK_SUCCEED) + ((size_t *) h->heapbckt.base)[0] &= ~mask; + return rc; + } else { + if (dosync && + !(GDKdebug & NOSYNCMASK) && + MT_msync(h->heapbckt.base, SIZEOF_SIZE_T) < 0) { + ((size_t *) h->heapbckt.base)[0] &= ~mask; + return GDK_FAIL; + } + } + } + return GDK_SUCCEED; +} + gdk_return HASHgrowbucket(BAT *b) { @@ -301,17 +352,10 @@ HASHgrowbucket(BAT *b) TRC_DEBUG_IF(ACCELERATOR) t0 = GDKusec(); - if (!h->heapbckt.dirty && !h->heaplink.dirty && - ((size_t *) h->heapbckt.base)[0] & ((size_t) 1 << 24)) { - ((size_t *) h->heapbckt.base)[0] &= ~((size_t) 1 << 24); - if (h->heapbckt.storage != STORE_MEM) { - if (!(GDKdebug & NOSYNCMASK) && - MT_msync(h->heapbckt.base, SIZEOF_SIZE_T) < 0) { - doHASHdestroy(b, h); - b->thash = NULL; - return GDK_FAIL; - } - } + if (HASHfix(h, false, true) != GDK_SUCCEED) { + doHASHdestroy(b, h); + b->thash = NULL; + return GDK_FAIL; } /* only needed to fix hash tables built before this fix was @@ -548,7 +592,6 @@ BATcheckhash(BAT *b) static gdk_return BAThashsave_intern(BAT *b, bool dosync) { - int fd; gdk_return rc = GDK_SUCCEED; Hash *h; lng t0 = 0; @@ -571,38 +614,8 @@ BAThashsave_intern(BAT *b, bool dosync) HEAPsave(&h->heaplink, h->heaplink.filename, NULL, dosync) == GDK_SUCCEED && HEAPsave(hp, hp->filename, NULL, dosync) == GDK_SUCCEED) { h->heaplink.dirty = false; - if (hp->storage == STORE_MEM) { - if ((fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) { - ((size_t *) hp->base)[0] |= (size_t) 1 << 24; - if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) { - rc = GDK_SUCCEED; - if (dosync && - !(GDKdebug & NOSYNCMASK)) { -#if defined(NATIVE_WIN32) - _commit(fd); -#elif defined(HAVE_FDATASYNC) - fdatasync(fd); -#elif defined(HAVE_FSYNC) - fsync(fd); -#endif - } - hp->dirty = false; - } else { - perror("write hash"); - ((size_t *) hp->base)[0] &= ~((size_t) 1 << 24); - } - close(fd); - } - } else { - ((size_t *) hp->base)[0] |= (size_t) 1 << 24; - if (dosync && !(GDKdebug & NOSYNCMASK) && - MT_msync(hp->base, SIZEOF_SIZE_T) < 0) { - ((size_t *) hp->base)[0] &= ~((size_t) 1 << 24); - } else { - hp->dirty = false; - rc = GDK_SUCCEED; - } - } + hp->dirty = false; + rc = HASHfix(h, true, dosync); TRC_DEBUG(ACCELERATOR, ALGOBATFMT ": persisting hash %s%s (" LLFMT " usec)%s\n", ALGOBATPAR(b), hp->filename, dosync ? "" : " no sync", GDKusec() - t0, rc == GDK_SUCCEED ? "" : " failed"); } @@ -1088,17 +1101,10 @@ HASHappend_locked(BAT *b, BUN i, const v return; } assert(i * h->width == h->heaplink.free); - if (!h->heapbckt.dirty && !h->heaplink.dirty && _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list