Changeset: 2f44594a914e for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2f44594a914e
Modified Files:
        sql/server/sql_scan.c
Branch: Jul2021
Log Message:

Merge heads


diffs (truncated from 5186 to 300 lines):

diff --git a/clients/Tests/MAL-signatures.stable.out 
b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -9170,6 +9170,7 @@ stdout of test 'MAL-signatures` in direc
 [ "rapi",      "eval_aggr",    "pattern rapi.eval_aggr(X_0:ptr, X_1:str, 
X_2:any...):any... ", "RAPIevalAggr;",        ""      ]
 [ "rapi",      "prelude",      "command rapi.prelude():void ", "RAPIprelude;", 
""      ]
 [ "rapi",      "subeval_aggr", "pattern rapi.subeval_aggr(X_0:ptr, X_1:str, 
X_2:any...):any... ",      "RAPIevalAggr;",        ""      ]
+[ "remote",    "assert",       "pattern remote.assert(X_0:bit, X_1:str):void 
",        "RMTassert;",   ""      ]
 [ "remote",    "batbincopy",   "pattern remote.batbincopy():bat[:any] ",       
"RMTbincopyfrom;",      ""      ]
 [ "remote",    "batbincopy",   "pattern remote.batbincopy(X_0:bat[:any]):void 
",       "RMTbincopyto;",        ""      ]
 [ "remote",    "batload",      "pattern remote.batload(X_0:any_1, 
X_1:int):bat[:any_1] ",      "RMTbatload;",  ""      ]
@@ -9271,6 +9272,7 @@ stdout of test 'MAL-signatures` in direc
 [ "sql",       "deltas",       "pattern sql.deltas(X_0:str, X_1:str) 
(X_2:bat[:int], X_3:bat[:lng], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], 
X_7:bat[:lng], X_8:bat[:int]) ",      "mvc_delta_values;",    ""      ]
 [ "sql",       "deltas",       "pattern sql.deltas(X_0:str, X_1:str, X_2:str) 
(X_3:bat[:int], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], 
X_8:bat[:lng], X_9:bat[:int]) ",     "mvc_delta_values;",    ""      ]
 [ "sql",       "dense_rank",   "pattern sql.dense_rank(X_0:any_1, X_1:bit, 
X_2:bit):int ",     "SQLdense_rank;",       ""      ]
+[ "sql",       "deregister",   "pattern sql.deregister():int ",        
"RAstatementEnd;",      ""      ]
 [ "sql",       "diff", "pattern sql.diff(X_0:any_1):bit ",     "SQLdiff;",     
""      ]
 [ "sql",       "diff", "pattern sql.diff(X_0:bit, X_1:any_1):bit ",    
"SQLdiff;",     ""      ]
 [ "sql",       "drop_hash",    "unsafe pattern sql.drop_hash(X_0:str, 
X_1:str):void ", "SQLdrop_hash;",        ""      ]
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 
b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -12471,6 +12471,7 @@ stdout of test 'MAL-signatures` in direc
 [ "rapi",      "eval_aggr",    "pattern rapi.eval_aggr(X_0:ptr, X_1:str, 
X_2:any...):any... ", "RAPIevalAggr;",        ""      ]
 [ "rapi",      "prelude",      "command rapi.prelude():void ", "RAPIprelude;", 
""      ]
 [ "rapi",      "subeval_aggr", "pattern rapi.subeval_aggr(X_0:ptr, X_1:str, 
X_2:any...):any... ",      "RAPIevalAggr;",        ""      ]
+[ "remote",    "assert",       "pattern remote.assert(X_0:bit, X_1:str):void 
",        "RMTassert;",   ""      ]
 [ "remote",    "batbincopy",   "pattern remote.batbincopy():bat[:any] ",       
"RMTbincopyfrom;",      ""      ]
 [ "remote",    "batbincopy",   "pattern remote.batbincopy(X_0:bat[:any]):void 
",       "RMTbincopyto;",        ""      ]
 [ "remote",    "batload",      "pattern remote.batload(X_0:any_1, 
X_1:int):bat[:any_1] ",      "RMTbatload;",  ""      ]
@@ -12577,6 +12578,7 @@ stdout of test 'MAL-signatures` in direc
 [ "sql",       "deltas",       "pattern sql.deltas(X_0:str, X_1:str) 
(X_2:bat[:int], X_3:bat[:lng], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], 
X_7:bat[:lng], X_8:bat[:int]) ",      "mvc_delta_values;",    ""      ]
 [ "sql",       "deltas",       "pattern sql.deltas(X_0:str, X_1:str, X_2:str) 
(X_3:bat[:int], X_4:bat[:lng], X_5:bat[:lng], X_6:bat[:lng], X_7:bat[:lng], 
X_8:bat[:lng], X_9:bat[:int]) ",     "mvc_delta_values;",    ""      ]
 [ "sql",       "dense_rank",   "pattern sql.dense_rank(X_0:any_1, X_1:bit, 
X_2:bit):int ",     "SQLdense_rank;",       ""      ]
+[ "sql",       "deregister",   "pattern sql.deregister():int ",        
"RAstatementEnd;",      ""      ]
 [ "sql",       "diff", "pattern sql.diff(X_0:any_1):bit ",     "SQLdiff;",     
""      ]
 [ "sql",       "diff", "pattern sql.diff(X_0:bit, X_1:any_1):bit ",    
"SQLdiff;",     ""      ]
 [ "sql",       "drop_hash",    "unsafe pattern sql.drop_hash(X_0:str, 
X_1:str):void ", "SQLdrop_hash;",        ""      ]
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -937,6 +937,7 @@ const char *deleteRef;
 void deleteSymbol(Module scope, Symbol prg);
 const char *deltaRef;
 const char *dense_rankRef;
+const char *deregisterRef;
 malType destinationType(MalBlkPtr mb, InstrPtr p);
 const char *diffRef;
 const char *diffcandRef;
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -1300,7 +1300,10 @@ BATappend_or_update(BAT *b, BAT *p, cons
 #endif
                        }
                        if (ATOMreplaceVAR(b, &d, new) != GDK_SUCCEED) {
+                               Hash *h = b->thash;
+                               b->thash = NULL;
                                MT_rwlock_wrunlock(&b->thashlock);
+                               doHASHdestroy(b, h);
                                bat_iterator_end(&ni);
                                return GDK_FAIL;
                        }
@@ -1308,7 +1311,10 @@ BATappend_or_update(BAT *b, BAT *p, cons
                            (b->twidth <= 2 ? d - GDK_VAROFFSET : d) >= 
((size_t) 1 << (8 << b->tshift))) {
                                /* doesn't fit in current heap, upgrade it */
                                if (GDKupgradevarheap(b, d, 0, false) != 
GDK_SUCCEED) {
+                                       Hash *h = b->thash;
+                                       b->thash = NULL;
                                        MT_rwlock_wrunlock(&b->thashlock);
+                                       doHASHdestroy(b, h);
                                        bat_iterator_end(&ni);
                                        return GDK_FAIL;
                                }
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1012,6 +1012,58 @@ movestrbats(void)
 }
 #endif
 
+static void
+BBPmanager(void *dummy)
+{
+       (void) dummy;
+
+       for (;;) {
+               int n = 0;
+               for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+                       MT_lock_set(&GDKswapLock(bid));
+                       if (BBP_refs(bid) == 0 && BBP_lrefs(bid) != 0) {
+                               BBP_status_off(bid, BBPHOT);
+                               n++;
+                       }
+                       MT_lock_unset(&GDKswapLock(bid));
+               }
+               TRC_DEBUG(BAT_, "cleared HOT bit from %d bats\n", n);
+               for (int i = 0; i < 100; i++) {
+                       MT_sleep_ms(100);
+                       if (GDKexiting())
+                               return;
+               }
+               n = 0;
+               for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+                       MT_lock_set(&GDKswapLock(bid));
+                       BAT *b = NULL;
+                       bool swap = false;
+                       if (BBP_refs(bid) == 0 &&
+                           BBP_lrefs(bid) != 0 &&
+                           (b = BBP_cache(bid)) != NULL &&
+                           b->batSharecnt == 0 &&
+                           !BATdirty(b) &&
+                           !(BBP_status(bid) & (BBPHOT | BBPUNLOADING)) &&
+                           (BBP_status(bid) & BBPPERSISTENT)) {
+                               BBP_status_on(bid, BBPUNLOADING);
+                               swap = true;
+                       }
+                       MT_lock_unset(&GDKswapLock(bid));
+                       if (swap) {
+                               TRC_DEBUG(BAT_, "unload and free bat %d\n", 
bid);
+                               if (BBPfree(b) != GDK_SUCCEED)
+                                       GDKerror("unload failed for bat %d", 
bid);
+                               n++;
+                       }
+               }
+               TRC_DEBUG(BAT_, "unloaded %d bats\n", n);
+               if (GDKexiting())
+                       return;
+       }
+}
+
+static MT_Id manager;
+
 gdk_return
 BBPinit(void)
 {
@@ -1199,6 +1251,8 @@ BBPinit(void)
                        return GDK_FAIL;
        }
 #endif
+
+       manager = THRcreate(BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager");
        return GDK_SUCCEED;
 
   bailout:
@@ -2371,7 +2425,7 @@ decref(bat i, bool logical, bool release
                 * while locked so no other thread thinks it's
                 * available anymore */
                assert((BBP_status(i) & BBPUNLOADING) == 0);
-               TRC_DEBUG(BAT_, "%s set to unloading BAT %d (status %u)\n", 
func, i, BBP_status(i));
+               TRC_DEBUG(BAT_, "%s set to unloading BAT %d (status %u, lrefs 
%d)\n", func, i, BBP_status(i), BBP_lrefs(i));
                BBP_status_on(i, BBPUNLOADING);
                swap = true;
        }
diff --git a/gdk/gdk_hash.c b/gdk/gdk_hash.c
--- a/gdk/gdk_hash.c
+++ b/gdk/gdk_hash.c
@@ -105,7 +105,7 @@ HASHclear(Hash *h)
 #define HASH_VERSION_NOUUID    3
 #define HASH_HEADER_SIZE       7       /* nr of size_t fields in header */
 
-static void
+void
 doHASHdestroy(BAT *b, Hash *hs)
 {
        if (hs == (Hash *) 1) {
@@ -291,6 +291,57 @@ HASHupgradehashheap(BAT *b)
        return GDK_SUCCEED;
 }
 
+/* write/remove the bit into/from the hash file that indicates the hash
+ * is good to go; the bit is the last part to be written and the first
+ * to be removed */
+static inline gdk_return
+HASHfix(Hash *h, bool save, bool dosync)
+{
+       if (!h->heapbckt.dirty && !h->heaplink.dirty) {
+               const size_t mask = (size_t) 1 << 24;
+               if (((size_t *) h->heapbckt.base)[0] & mask) {
+                       if (save)
+                               return GDK_SUCCEED;
+                       ((size_t *) h->heapbckt.base)[0] &= ~mask;
+               } else {
+                       if (!save)
+                               return GDK_SUCCEED;
+                       ((size_t *) h->heapbckt.base)[0] |= mask;
+               }
+               if (h->heapbckt.storage == STORE_MEM) {
+                       gdk_return rc = GDK_FAIL;
+                       int fd = GDKfdlocate(h->heapbckt.farmid, 
h->heapbckt.filename, "rb+", NULL);
+                       if (fd >= 0) {
+                               if (write(fd, h->heapbckt.base, SIZEOF_SIZE_T) 
== SIZEOF_SIZE_T) {
+                                       if (dosync &&
+                                           !(GDKdebug & NOSYNCMASK)) {
+#if defined(NATIVE_WIN32)
+                                               _commit(fd);
+#elif defined(HAVE_FDATASYNC)
+                                               fdatasync(fd);
+#elif defined(HAVE_FSYNC)
+                                               fsync(fd);
+#endif
+                                       }
+                                       rc = GDK_SUCCEED;
+                               }
+                               close(fd);
+                       }
+                       if (rc != GDK_SUCCEED)
+                               ((size_t *) h->heapbckt.base)[0] &= ~mask;
+                       return rc;
+               } else {
+                       if (dosync &&
+                           !(GDKdebug & NOSYNCMASK) &&
+                           MT_msync(h->heapbckt.base, SIZEOF_SIZE_T) < 0) {
+                               ((size_t *) h->heapbckt.base)[0] &= ~mask;
+                               return GDK_FAIL;
+                       }
+               }
+       }
+       return GDK_SUCCEED;
+}
+
 gdk_return
 HASHgrowbucket(BAT *b)
 {
@@ -301,17 +352,10 @@ HASHgrowbucket(BAT *b)
 
        TRC_DEBUG_IF(ACCELERATOR) t0 = GDKusec();
 
-       if (!h->heapbckt.dirty && !h->heaplink.dirty &&
-           ((size_t *) h->heapbckt.base)[0] & ((size_t) 1 << 24)) {
-               ((size_t *) h->heapbckt.base)[0] &= ~((size_t) 1 << 24);
-               if (h->heapbckt.storage != STORE_MEM) {
-                       if (!(GDKdebug & NOSYNCMASK) &&
-                           MT_msync(h->heapbckt.base, SIZEOF_SIZE_T) < 0) {
-                               doHASHdestroy(b, h);
-                               b->thash = NULL;
-                               return GDK_FAIL;
-                       }
-               }
+       if (HASHfix(h, false, true) != GDK_SUCCEED) {
+               doHASHdestroy(b, h);
+               b->thash = NULL;
+               return GDK_FAIL;
        }
 
        /* only needed to fix hash tables built before this fix was
@@ -548,7 +592,6 @@ BATcheckhash(BAT *b)
 static gdk_return
 BAThashsave_intern(BAT *b, bool dosync)
 {
-       int fd;
        gdk_return rc = GDK_SUCCEED;
        Hash *h;
        lng t0 = 0;
@@ -571,38 +614,8 @@ BAThashsave_intern(BAT *b, bool dosync)
                    HEAPsave(&h->heaplink, h->heaplink.filename, NULL, dosync) 
== GDK_SUCCEED &&
                    HEAPsave(hp, hp->filename, NULL, dosync) == GDK_SUCCEED) {
                        h->heaplink.dirty = false;
-                       if (hp->storage == STORE_MEM) {
-                               if ((fd = GDKfdlocate(hp->farmid, hp->filename, 
"rb+", NULL)) >= 0) {
-                                       ((size_t *) hp->base)[0] |= (size_t) 1 
<< 24;
-                                       if (write(fd, hp->base, SIZEOF_SIZE_T) 
>= 0) {
-                                               rc = GDK_SUCCEED;
-                                               if (dosync &&
-                                                   !(GDKdebug & NOSYNCMASK)) {
-#if defined(NATIVE_WIN32)
-                                                       _commit(fd);
-#elif defined(HAVE_FDATASYNC)
-                                                       fdatasync(fd);
-#elif defined(HAVE_FSYNC)
-                                                       fsync(fd);
-#endif
-                                               }
-                                               hp->dirty = false;
-                                       } else {
-                                               perror("write hash");
-                                               ((size_t *) hp->base)[0] &= 
~((size_t) 1 << 24);
-                                       }
-                                       close(fd);
-                               }
-                       } else {
-                               ((size_t *) hp->base)[0] |= (size_t) 1 << 24;
-                               if (dosync && !(GDKdebug & NOSYNCMASK) &&
-                                   MT_msync(hp->base, SIZEOF_SIZE_T) < 0) {
-                                       ((size_t *) hp->base)[0] &= ~((size_t) 
1 << 24);
-                               } else {
-                                       hp->dirty = false;
-                                       rc = GDK_SUCCEED;
-                               }
-                       }
+                       hp->dirty = false;
+                       rc = HASHfix(h, true, dosync);
                        TRC_DEBUG(ACCELERATOR,
                                  ALGOBATFMT ": persisting hash %s%s (" LLFMT " 
usec)%s\n", ALGOBATPAR(b), hp->filename, dosync ? "" : " no sync", GDKusec() - 
t0, rc == GDK_SUCCEED ? "" : " failed");
                }
@@ -1088,17 +1101,10 @@ HASHappend_locked(BAT *b, BUN i, const v
                return;
        }
        assert(i * h->width == h->heaplink.free);
-       if (!h->heapbckt.dirty && !h->heaplink.dirty &&
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to