Changeset: fbbfc61a01a8 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fbbfc61a01a8
Modified Files:
        gdk/gdk_bat.c
        gdk/gdk_bbp.c
        monetdb5/modules/mal/clients.c
        sql/server/rel_optimizer.c
Branch: default
Log Message:

Merged with Jul2021


diffs (truncated from 488 to 300 lines):

diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -94,7 +94,7 @@ BATcreatedesc(oid hseq, int tt, bool hea
                .batRole = role,
                .batTransient = true,
        };
-       if (heapnames && (bn->theap = GDKzalloc(sizeof(Heap))) == NULL) {
+       if (heapnames && (bn->theap = GDKmalloc(sizeof(Heap))) == NULL) {
                GDKfree(bn);
                return NULL;
        }
@@ -115,16 +115,23 @@ BATcreatedesc(oid hseq, int tt, bool hea
 
        if (heapnames) {
                assert(bn->theap != NULL);
-               bn->theap->parentid = bn->batCacheid;
-               bn->theap->farmid = BBPselectfarm(role, bn->ttype, offheap);
+               *bn->theap = (Heap) {
+                       .parentid = bn->batCacheid,
+                       .farmid = BBPselectfarm(role, bn->ttype, offheap),
+               };
 
                const char *nme = BBP_physical(bn->batCacheid);
                strconcat_len(bn->theap->filename, sizeof(bn->theap->filename),
                              nme, ".tail", NULL);
 
                if (ATOMneedheap(tt)) {
-                       if ((bn->tvheap = GDKmalloc(sizeof(Heap))) == NULL)
-                               goto bailout;
+                       if ((bn->tvheap = GDKmalloc(sizeof(Heap))) == NULL) {
+                               BBPclear(bn->batCacheid, true);
+                               HEAPfree(bn->theap, true);
+                               GDKfree(bn->theap);
+                               GDKfree(bn);
+                               return NULL;
+                       }
                        *bn->tvheap = (Heap) {
                                .parentid = bn->batCacheid,
                                .farmid = BBPselectfarm(role, bn->ttype, 
varheap),
@@ -147,14 +154,6 @@ BATcreatedesc(oid hseq, int tt, bool hea
        MT_rwlock_init(&bn->thashlock, name);
        bn->batDirtydesc = true;
        return bn;
-      bailout:
-       BBPclear(bn->batCacheid, true);
-       if (bn->theap)
-               HEAPdecref(bn->theap, true);
-       if (bn->tvheap)
-               HEAPdecref(bn->tvheap, true);
-       GDKfree(bn);
-       return NULL;
 }
 
 uint8_t
@@ -1780,7 +1779,7 @@ BATsetcount(BAT *b, BUN cnt)
        MT_lock_set(&b->theaplock);
        b->batCount = cnt;
        b->batDirtydesc = true;
-       b->theap->dirty |= b->ttype != TYPE_void && b->theap->parentid == 
b->batCacheid;
+       b->theap->dirty |= b->ttype != TYPE_void && b->theap->parentid == 
b->batCacheid && cnt > 0;
        if (b->theap->parentid == b->batCacheid)
                b->theap->free = tailsize(b, cnt);
        if (b->ttype == TYPE_void)
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1285,6 +1285,39 @@ movestrbats(void)
 #endif
 
 static void
+BBPtrim(bool aggressive)
+{
+       int n = 0;
+       unsigned flag = BBPUNLOADING | BBPSYNCING | BBPSAVING;
+       if (!aggressive)
+               flag |= BBPHOT;
+       for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
+               MT_lock_set(&GDKswapLock(bid));
+               BAT *b = NULL;
+               bool swap = false;
+               if (BBP_refs(bid) == 0 &&
+                   BBP_lrefs(bid) != 0 &&
+                   (b = BBP_cache(bid)) != NULL &&
+                   b->batSharecnt == 0 &&
+                   (!BATdirty(b) || (aggressive && b->theap->storage == 
STORE_MMAP && (b->tvheap == NULL || b->tvheap->storage == STORE_MMAP))) &&
+                   !(BBP_status(bid) & flag) /*&&
+                   (BBP_status(bid) & BBPPERSISTENT ||
+                   (b->batRole == PERSISTENT && BBP_lrefs(bid) == 1)) */) {
+                       BBP_status_on(bid, BBPUNLOADING);
+                       swap = true;
+               }
+               MT_lock_unset(&GDKswapLock(bid));
+               if (swap) {
+                       TRC_DEBUG(BAT_, "unload and free bat %d\n", bid);
+                       if (BBPfree(b) != GDK_SUCCEED)
+                               GDKerror("unload failed for bat %d", bid);
+                       n++;
+               }
+       }
+       TRC_DEBUG(BAT_, "unloaded %d bats%s\n", n, aggressive ? " (also hot)" : 
"");
+}
+
+static void
 BBPmanager(void *dummy)
 {
        (void) dummy;
@@ -1294,41 +1327,19 @@ BBPmanager(void *dummy)
                for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
                        MT_lock_set(&GDKswapLock(bid));
                        if (BBP_refs(bid) == 0 && BBP_lrefs(bid) != 0) {
+                               n += (BBP_status(bid) & BBPHOT) != 0;
                                BBP_status_off(bid, BBPHOT);
-                               n++;
                        }
                        MT_lock_unset(&GDKswapLock(bid));
                }
                TRC_DEBUG(BAT_, "cleared HOT bit from %d bats\n", n);
-               for (int i = 0; i < 100; i++) {
+               size_t cur = GDKvm_cursize();
+               for (int i = 0, n = cur > GDK_vm_maxsize / 2 ? 1 : cur > 
GDK_vm_maxsize / 4 ? 10 : 100; i < n; i++) {
                        MT_sleep_ms(100);
                        if (GDKexiting())
                                return;
                }
-               n = 0;
-               for (bat bid = 1; bid < (bat) ATOMIC_GET(&BBPsize); bid++) {
-                       MT_lock_set(&GDKswapLock(bid));
-                       BAT *b = NULL;
-                       bool swap = false;
-                       if (BBP_refs(bid) == 0 &&
-                           BBP_lrefs(bid) != 0 &&
-                           (b = BBP_cache(bid)) != NULL &&
-                           b->batSharecnt == 0 &&
-                           !BATdirty(b) &&
-                           !(BBP_status(bid) & (BBPHOT | BBPUNLOADING | 
BBPSYNCING)) &&
-                           (BBP_status(bid) & BBPPERSISTENT)) {
-                               BBP_status_on(bid, BBPUNLOADING);
-                               swap = true;
-                       }
-                       MT_lock_unset(&GDKswapLock(bid));
-                       if (swap) {
-                               TRC_DEBUG(BAT_, "unload and free bat %d\n", 
bid);
-                               if (BBPfree(b) != GDK_SUCCEED)
-                                       GDKerror("unload failed for bat %d", 
bid);
-                               n++;
-                       }
-               }
-               TRC_DEBUG(BAT_, "unloaded %d bats\n", n);
+               BBPtrim(false);
                if (GDKexiting())
                        return;
        }
@@ -1943,10 +1954,11 @@ BBPdump(void)
                                fprintf(stderr, " Theap -> %d", 
b->theap->parentid);
                        } else {
                                fprintf(stderr,
-                                       " Theap=[%zu,%zu,f=%d]%s",
-                                       HEAPmemsize(b->theap),
-                                       HEAPvmsize(b->theap),
+                                       " Theap=[%zu,%zu,f=%d]%s%s",
+                                       b->theap->free,
+                                       b->theap->size,
                                        b->theap->farmid,
+                                       b->theap->base == NULL ? "X" : 
b->theap->storage == STORE_MMAP ? "M" : "",
                                        status & BBPSWAPPED ? "(Swapped)" : 
b->theap->dirty ? "(Dirty)" : "");
                                if (BBP_logical(i) && BBP_logical(i)[0] == '.') 
{
                                        cmem += HEAPmemsize(b->theap);
@@ -1966,10 +1978,11 @@ BBPdump(void)
                                        b->tvheap->parentid);
                        } else {
                                fprintf(stderr,
-                                       " Tvheap=[%zu,%zu,f=%d]%s",
-                                       HEAPmemsize(b->tvheap),
-                                       HEAPvmsize(b->tvheap),
+                                       " Tvheap=[%zu,%zu,f=%d]%s%s",
+                                       b->tvheap->free,
+                                       b->tvheap->size,
                                        b->tvheap->farmid,
+                                       b->tvheap->base == NULL ? "X" : 
b->tvheap->storage == STORE_MMAP ? "M" : "",
                                        b->tvheap->dirty ? "(Dirty)" : "");
                                if (BBP_logical(i) && BBP_logical(i)[0] == '.') 
{
                                        cmem += HEAPmemsize(b->tvheap);
@@ -2724,11 +2737,15 @@ decref(bat i, bool logical, bool release
 
        /* we destroy transients asap and unload persistent bats only
         * if they have been made cold or are not dirty */
+       unsigned chkflag = BBPSYNCING;
+       if (GDKvm_cursize() < GDK_vm_maxsize &&
+            ((b && b->theap ? b->theap->size : 0) + (b && b->tvheap ? 
b->tvheap->size : 0)) < (GDK_vm_maxsize - GDKvm_cursize()) / 32)
+               chkflag |= BBPHOT;
        if (BBP_refs(i) > 0 ||
            (BBP_lrefs(i) > 0 &&
             (b == NULL ||
              BATdirty(b) ||
-             (BBP_status(i) & (BBPHOT | BBPSYNCING)) ||
+             (BBP_status(i) & chkflag) ||
              !(BBP_status(i) & BBPPERSISTENT) ||
              GDKinmemory(farmid)))) {
                /* bat cannot be swapped out */
@@ -3580,24 +3597,41 @@ BBPsync(int cnt, bat *restrict subcommit
                while (++idx < cnt) {
                        bat i = subcommit ? subcommit[idx] : idx;
                        /* BBP_desc(i) may be NULL */
-                       BATiter bi = bat_iterator(BBP_desc(i));
                        BUN size = sizes ? sizes[idx] : BUN_NONE;
-                       if (size > bi.count)
-                               size = bi.count;
 
                        if (BBP_status(i) & BBPPERSISTENT) {
                                BAT *b = dirty_bat(&i, subcommit != NULL);
                                if (i <= 0) {
-                                       bat_iterator_end(&bi);
                                        break;
                                }
-                               if (b)
+                               if (b) {
+                                       /* wait for BBPSAVING so that we
+                                        * can set it, wait for
+                                        * BBPUNLOADING before
+                                        * attempting to save */
+                                       for (;;) {
+                                               if (lock)
+                                                       
MT_lock_set(&GDKswapLock(i));
+                                               if (!(BBP_status(i) & 
(BBPSAVING|BBPUNLOADING)))
+                                                       break;
+                                               if (lock)
+                                                       
MT_lock_unset(&GDKswapLock(i));
+                                               BBPspin(i, __func__, 
BBPSAVING|BBPUNLOADING);
+                                       }
+                                       BBP_status_on(i, BBPSAVING);
+                                       if (lock)
+                                               MT_lock_unset(&GDKswapLock(i));
+                                       BATiter bi = bat_iterator(b);
+                                       if (size > bi.count)
+                                               size = bi.count;
                                        ret = BATsave_locked(b, &bi, size);
+                                       bat_iterator_end(&bi);
+                                       BBP_status_off(i, BBPSAVING);
+                               }
                        }
                        if (ret == GDK_SUCCEED) {
                                n = BBPdir_step(i, size, n, buf, sizeof(buf), 
&obbpf, nbbpf);
                        }
-                       bat_iterator_end(&bi);
                        if (n == -2)
                                break;
                        /* we once again have a saved heap */
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -877,6 +877,8 @@ GDKembedded(void)
        return Mbedded;
 }
 
+static MT_Id mainpid;
+
 gdk_return
 GDKinit(opt *set, int setlen, bool embedded)
 {
@@ -888,6 +890,8 @@ GDKinit(opt *set, int setlen, bool embed
        int i, nlen = 0;
        char buf[16];
 
+       mainpid = MT_getpid();
+
        if (GDKinmemory(0)) {
                dbpath = dbtrace = NULL;
        } else {
@@ -1188,12 +1192,13 @@ GDKexiting(void)
 void
 GDKprepareExit(void)
 {
-       if (ATOMIC_ADD(&GDKstopped, 1) > 0)
-               return;
+       ATOMIC_ADD(&GDKstopped, 1);
 
-       TRC_DEBUG_IF(THRD)
-               dump_threads();
-       join_detached_threads();
+       if (MT_getpid() == mainpid) {
+               TRC_DEBUG_IF(THRD)
+                       dump_threads();
+               join_detached_threads();
+       }
 }
 
 void
diff --git a/monetdb5/modules/mal/clients.c b/monetdb5/modules/mal/clients.c
--- a/monetdb5/modules/mal/clients.c
+++ b/monetdb5/modules/mal/clients.c
@@ -844,7 +844,7 @@ CLTshutdown(Client cntxt, MalBlkPtr mb, 
                snprintf(buf, 1024,"%d client sessions still running",leftover);
        *ret = GDKstrdup(buf);
        if ( force)
-               mal_exit(0);
+               GDKprepareExit();
        if (*ret == NULL)
                throw(MAL, "mal.shutdown", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        return MAL_SUCCEED;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to