Changeset: fdcfaef289d0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fdcfaef289d0
Modified Files:
        clients/Tests/exports.stable.out
        gdk/gdk.h
        gdk/gdk_bat.c
        gdk/gdk_batop.c
        gdk/gdk_bbp.c
        gdk/gdk_heap.c
        sql/storage/store.c
Branch: default
Log Message:

Merge with Jan2022 branch.


diffs (288 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -221,6 +221,8 @@ int BBPretain(bat b);
 gdk_return BBPsave(BAT *b);
 void BBPshare(bat b);
 gdk_return BBPsync(int cnt, bat *restrict subcommit, BUN *restrict sizes, lng 
logno, lng transid);
+void BBPtmlock(void);
+void BBPtmunlock(void);
 int BBPunfix(bat b);
 void BBPunlock(void);
 gdk_return BUNappend(BAT *b, const void *right, bool force) 
__attribute__((__warn_unused_result__));
diff --git a/gdk/ChangeLog.Jan2022 b/gdk/ChangeLog.Jan2022
--- a/gdk/ChangeLog.Jan2022
+++ b/gdk/ChangeLog.Jan2022
@@ -1,6 +1,12 @@
 # ChangeLog file for GDK
 # This file is updated with Maddlog
 
+* Thu May 19 2022 Sjoerd Mullender <sjo...@acm.org>
+- All accesses to the BACKUP directory need to be protected by the
+  same lock.  The lock already existed (GDKtmLock), but wasn't used
+  consistently.  This is now fixed.  Hopefully this makes the hot snapshot
+  code more reliable.
+
 * Tue May 10 2022 Sjoerd Mullender <sjo...@acm.org>
 - When exiting, long running instructions are aborted using the same
   mechanism that is used for query timeouts.
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -1603,8 +1603,9 @@ gdk_export BBPrec *BBP[N_BBPINIT];
 #define BBPRENAME_MEMORY       (-4)
 
 gdk_export void BBPlock(void);
-
 gdk_export void BBPunlock(void);
+gdk_export void BBPtmlock(void);
+gdk_export void BBPtmunlock(void);
 
 gdk_export BAT *BBPquickdesc(bat b);
 
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1171,7 +1171,6 @@ BUNappendmulti(BAT *b, const void *value
                b->tsorted = b->trevsorted = b->tkey = false;
        }
        MT_lock_unset(&b->theaplock);
-       MT_rwlock_wrlock(&b->thashlock);
        if (values && b->ttype) {
                int (*atomcmp) (const void *, const void *) = 
ATOMcompare(b->ttype);
                const void *atomnil = ATOMnilptr(b->ttype);
@@ -1187,7 +1186,6 @@ BUNappendmulti(BAT *b, const void *value
                                t = ((void **) values)[i];
                                gdk_return rc = tfastins_nocheckVAR(b, p, t);
                                if (rc != GDK_SUCCEED) {
-                                       MT_rwlock_wrunlock(&b->thashlock);
                                        return rc;
                                }
                                if (vbase != b->tvheap->base) {
@@ -1203,9 +1201,6 @@ BUNappendmulti(BAT *b, const void *value
                                        if (bi.maxpos != BUN_NONE)
                                                maxvalp = BUNtvar(bi, 
bi.maxpos);
                                }
-                               if (b->thash) {
-                                       HASHappend_locked(b, p, t);
-                               }
                                if (atomcmp(t, atomnil) != 0) {
                                        if (p == 0) {
                                                bi.minpos = bi.maxpos = 0;
@@ -1225,6 +1220,16 @@ BUNappendmulti(BAT *b, const void *value
                                }
                                p++;
                        }
+                       MT_rwlock_wrlock(&b->thashlock);
+                       if (b->thash) {
+                               p -= count;
+                               for (BUN i = 0; i < count; i++) {
+                                       t = ((void **) values)[i];
+                                       HASHappend_locked(b, p, t);
+                                       p++;
+                               }
+                       }
+                       MT_rwlock_wrunlock(&b->thashlock);
                } else if (ATOMstorage(b->ttype) == TYPE_msk) {
                        bi.minpos = bi.maxpos = BUN_NONE;
                        minvalp = maxvalp = NULL;
@@ -1234,6 +1239,7 @@ BUNappendmulti(BAT *b, const void *value
                                p++;
                        }
                } else {
+                       MT_rwlock_wrlock(&b->thashlock);
                        for (BUN i = 0; i < count; i++) {
                                t = (void *) ((char *) values + (i << 
b->tshift));
                                gdk_return rc = tfastins_nocheckFIX(b, p, t);
@@ -1263,12 +1269,14 @@ BUNappendmulti(BAT *b, const void *value
                                }
                                p++;
                        }
+                       MT_rwlock_wrunlock(&b->thashlock);
                }
                MT_lock_set(&b->theaplock);
                b->tminpos = bi.minpos;
                b->tmaxpos = bi.maxpos;
                MT_lock_unset(&b->theaplock);
        } else {
+               MT_rwlock_wrlock(&b->thashlock);
                for (BUN i = 0; i < count; i++) {
                        gdk_return rc = tfastins_nocheck(b, p, t);
                        if (rc != GDK_SUCCEED) {
@@ -1280,8 +1288,8 @@ BUNappendmulti(BAT *b, const void *value
                        }
                        p++;
                }
+               MT_rwlock_wrunlock(&b->thashlock);
        }
-       MT_rwlock_wrunlock(&b->thashlock);
        MT_lock_set(&b->theaplock);
        BATsetcount(b, p);
        MT_lock_unset(&b->theaplock);
@@ -1547,11 +1555,14 @@ BUNinplacemulti(BAT *b, const oid *posit
                        }
                        if (b->twidth < SIZEOF_VAR_T &&
                            (b->twidth <= 2 ? _d - GDK_VAROFFSET : _d) >= 
((size_t) 1 << (8 << b->tshift))) {
-                               /* doesn't fit in current heap, upgrade it */
+                               /* doesn't fit in current heap, upgrade
+                                * it, can't keep hashlock while doing
+                                * so */
+                               MT_rwlock_wrunlock(&b->thashlock);
                                if (GDKupgradevarheap(b, _d, 0, bi.count) != 
GDK_SUCCEED) {
-                                       MT_rwlock_wrunlock(&b->thashlock);
                                        return GDK_FAIL;
                                }
+                               MT_rwlock_wrlock(&b->thashlock);
                        }
                        /* reinitialize iterator after possible heap upgrade */
                        {
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -444,18 +444,23 @@ append_varsized_bat(BAT *b, BATiter *ni,
        }
        /* copy data from n to b */
        r = BATcount(b);
-       MT_rwlock_wrlock(&b->thashlock);
-       while (cnt > 0) {
-               cnt--;
+       for (BUN i = 0; i < cnt; i++) {
                BUN p = canditer_next(ci) - hseq;
                const void *t = BUNtvar(*ni, p);
                if (tfastins_nocheckVAR(b, r, t) != GDK_SUCCEED) {
-                       MT_rwlock_wrunlock(&b->thashlock);
                        return GDK_FAIL;
                }
-               if (b->thash)
+               r++;
+       }
+       MT_rwlock_wrlock(&b->thashlock);
+       if (b->thash) {
+               r -= cnt;
+               BATiter bi = bat_iterator_nolock(b);
+               for (BUN i = 0; i < cnt; i++) {
+                       const void *t = BUNtvar(bi, r);
                        HASHappend_locked(b, r, t);
-               r++;
+                       r++;
+               }
        }
        MT_rwlock_wrunlock(&b->thashlock);
        MT_lock_set(&b->theaplock);
@@ -1276,10 +1281,16 @@ BATappend_or_update(BAT *b, BAT *p, cons
                        }
                        if (b->twidth < SIZEOF_VAR_T &&
                            (b->twidth <= 2 ? d - GDK_VAROFFSET : d) >= 
((size_t) 1 << (8 << b->tshift))) {
-                               /* doesn't fit in current heap, upgrade it */
+                               /* doesn't fit in current heap, upgrade
+                                * it, can't keep hashlock while doing
+                                * so */
+                               MT_rwlock_wrunlock(&b->thashlock);
+                               locked = false;
                                if (GDKupgradevarheap(b, d, 0, MAX(updid, 
b->batCount)) != GDK_SUCCEED) {
                                        goto bailout;
                                }
+                               MT_rwlock_wrlock(&b->thashlock);
+                               locked = true;
                        }
                        /* in case ATOMreplaceVAR and/or
                         * GDKupgradevarheap replaces a heap, we need to
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -256,6 +256,18 @@ static int BBPunloadCnt = 0;
 static MT_Lock GDKunloadLock = MT_LOCK_INITIALIZER(GDKunloadLock);
 
 void
+BBPtmlock(void)
+{
+       MT_lock_set(&GDKtmLock);
+}
+
+void
+BBPtmunlock(void)
+{
+       MT_lock_unset(&GDKtmLock);
+}
+
+void
 BBPlock(void)
 {
        int i;
@@ -289,7 +301,6 @@ BBPunlock(void)
        MT_lock_unset(&GDKtmLock);
 }
 
-
 static gdk_return
 BBPinithash(bat size)
 {
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -488,10 +488,13 @@ GDKupgradevarheap(BAT *b, var_t v, BUN c
                        p++;
                        p[1] = 0;
                }
+               MT_lock_set(&GDKtmLock);
                for (;;) {
                        exists = file_exists(old->farmid, BAKDIR, fname, NULL);
-                       if (exists == -1)
+                       if (exists == -1) {
+                               MT_lock_unset(&GDKtmLock);
                                return GDK_FAIL;
+                       }
                        if (exists == 1)
                                break;
                        if (*p == '1')
@@ -515,8 +518,10 @@ GDKupgradevarheap(BAT *b, var_t v, BUN c
                        const char *base = old->base;
 
                        /* first save heap in file with extra .tmp extension */
-                       if ((fd = GDKfdlocate(old->farmid, old->filename, "wb", 
"tmp")) < 0)
+                       if ((fd = GDKfdlocate(old->farmid, old->filename, "wb", 
"tmp")) < 0) {
+                               MT_lock_unset(&GDKtmLock);
                                return GDK_FAIL;
+                       }
                        while (size > 0) {
                                ret = write(fd, base, (unsigned) MIN(1 << 30, 
size));
                                if (ret < 0)
@@ -539,6 +544,7 @@ GDKupgradevarheap(BAT *b, var_t v, BUN c
                                GDKsyserror("syncing heap to disk failed\n");
                                close(fd);
                                GDKunlink(old->farmid, BATDIR, old->filename, 
"tmp");
+                               MT_lock_unset(&GDKtmLock);
                                return GDK_FAIL;
                        }
                        /* move tmp file to backup directory (without .tmp
@@ -546,9 +552,11 @@ GDKupgradevarheap(BAT *b, var_t v, BUN c
                        if (GDKmove(old->farmid, BATDIR, old->filename, "tmp", 
BAKDIR, filename, NULL, true) != GDK_SUCCEED) {
                                /* backup failed */
                                GDKunlink(old->farmid, BATDIR, old->filename, 
"tmp");
+                               MT_lock_unset(&GDKtmLock);
                                return GDK_FAIL;
                        }
                }
+               MT_lock_unset(&GDKtmLock);
        }
 
        new = GDKmalloc(sizeof(Heap));
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -2669,6 +2669,7 @@ store_hot_snapshot_to_stream(sqlstore *s
 
        MT_lock_set(&store->flush);
        MT_lock_set(&store->lock);
+       BBPtmlock();
        locked = 1;
        if (GDKexiting())
                goto end;
@@ -2693,6 +2694,7 @@ store_hot_snapshot_to_stream(sqlstore *s
 
 end:
        if (locked) {
+               BBPtmunlock();
                MT_lock_unset(&store->lock);
                MT_lock_unset(&store->flush);
        }
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to