Changeset: 12047d4578bf for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/12047d4578bf
Modified Files:
        sql/storage/store.c
Branch: default
Log Message:

merged


diffs (truncated from 794 to 300 lines):

diff --git a/gdk/ChangeLog.Sep2022 b/gdk/ChangeLog.Sep2022
--- a/gdk/ChangeLog.Sep2022
+++ b/gdk/ChangeLog.Sep2022
@@ -1,6 +1,11 @@
 # ChangeLog file for GDK
 # This file is updated with Maddlog
 
+* Mon Jan 16 2023 Sjoerd Mullender <sjo...@acm.org>
+- Fixed a race condition that could lead to a bat being added to the SQL
+  catalog but nog being made persistent, causing a subsequent restart
+  of the system to fail (and crash).
+
 * Wed Dec 14 2022 Sjoerd Mullender <sjo...@acm.org>
 - Fixed a race condition where a hash could have been created on a
   bat using the old bat count while in another thread the bat count
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -97,17 +97,8 @@ typedef enum {LOG_OK, LOG_EOF, LOG_ERR} 
 static gdk_return bm_commit(logger *lg);
 static gdk_return tr_grow(trans *tr);
 
-static inline void
-log_lock(logger *lg)
-{
-       MT_lock_set(&lg->lock);
-}
-
-static inline void
-log_unlock(logger *lg)
-{
-       MT_lock_unset(&lg->lock);
-}
+#define log_lock(lg)   MT_lock_set(&(lg)->lock)
+#define log_unlock(lg) MT_lock_unset(&(lg)->lock)
 
 static inline bte
 find_type(logger *lg, int tpe)
@@ -1615,11 +1606,12 @@ cleanup_and_swap(logger *lg, int *r, con
        return rcnt;
 }
 
+/* this function is called with log_lock() held; it releases the lock
+ * before returning */
 static gdk_return
 bm_subcommit(logger *lg)
 {
        BUN p, q;
-       log_lock(lg);
        BAT *catalog_bid = lg->catalog_bid;
        BAT *catalog_id = lg->catalog_id;
        BAT *dcatalog = lg->dcatalog;
@@ -1936,6 +1928,8 @@ log_load(int debug, const char *fn, cons
                BBPretain(lg->catalog_id->batCacheid);
                BBPretain(lg->dcatalog->batCacheid);
 
+               log_lock(lg);
+               /* bm_subcommit releases the lock */
                if (bm_subcommit(lg) != GDK_SUCCEED) {
                        /* cannot commit catalog, so remove log */
                        MT_remove(filename);
@@ -3081,7 +3075,7 @@ bm_commit(logger *lg)
                        fprintf(stderr, "#bm_commit: create %d (%d)\n",
                                bid, BBP_lrefs(bid));
        }
-       log_unlock(lg);
+       /* bm_subcommit releases the lock */
        return bm_subcommit(lg);
 }
 
diff --git a/gdk/gdk_select.c b/gdk/gdk_select.c
--- a/gdk/gdk_select.c
+++ b/gdk/gdk_select.c
@@ -1722,7 +1722,7 @@ BATselect(BAT *b, BAT *s, const void *tl
                if ((oidxh = b->torderidx) != NULL)
                        HEAPincref(oidxh);
                MT_lock_unset(&b->batIdxLock);
-               if (oidxh == NULL && pb) {
+               if (oidxh == NULL && pb != NULL) {
                        (void) BATcheckorderidx(pb);
                        MT_lock_set(&pb->batIdxLock);
                        if ((oidxh = pb->torderidx) != NULL) {
@@ -1944,7 +1944,7 @@ BATselect(BAT *b, BAT *s, const void *tl
                } else if (b->thash->nunique == bi.count)
                        estimate = 1;
        }
-       if (estimate == BUN_NONE && (bi.key || (pb != NULL && pb->tkey))) {
+       if (estimate == BUN_NONE && (bi.key || (pb != NULL && pbi.key))) {
                /* exact result size in special cases */
                if (equi) {
                        estimate = 1;
@@ -2051,9 +2051,7 @@ BATselect(BAT *b, BAT *s, const void *tl
                if (!equi &&
                    /* DISABLES CODE */ (0) && imprintable(bi.type) &&
                    (!bi.transient ||
-                    (parent != 0 &&
-                     pb != NULL &&
-                     !pbi.transient)) &&
+                    (pb != NULL && !pbi.transient)) &&
                    BATimprints(b) == GDK_SUCCEED) {
                        if (pb != NULL) {
                                MT_lock_set(&pb->batIdxLock);
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -191,6 +191,9 @@ static struct winthread {
        MT_Lock *lockwait;      /* lock we're waiting for */
        MT_Sema *semawait;      /* semaphore we're waiting for */
        MT_Cond *condwait;      /* condition variable we're waiting for */
+#ifdef LOCK_OWNER
+       MT_Lock *mylocks;       /* locks we're holding */
+#endif
        struct winthread *joinwait; /* process we are joining with */
        const char *working;    /* what we're currently doing */
        char algorithm[512];    /* the algorithm used in the last operation */
@@ -211,21 +214,33 @@ static DWORD threadslot = TLS_OUT_OF_IND
 void
 dump_threads(void)
 {
-       TRC_DEBUG_IF(THRD) {
-               EnterCriticalSection(&winthread_cs);
-               for (struct winthread *w = winthreads; w; w = w->next) {
-                       TRC_DEBUG_ENDIF(THRD, "%s, waiting for %s, working on 
%.200s\n",
-                                       w->threadname,
-                                       w->lockwait ? w->lockwait->name :
-                                       w->semawait ? w->semawait->name :
-                                       w->condwait ? w->condwait->name :
-                                       w->joinwait ? w->joinwait->threadname :
-                                       "nothing",
-                                       ATOMIC_GET(&w->exited) ? "exiting" :
-                                       w->working ? w->working : "nothing");
+       char buf[1024];
+       EnterCriticalSection(&winthread_cs);
+       for (struct winthread *w = winthreads; w; w = w->next) {
+               int pos = snprintf(buf, sizeof(buf),
+                                  "%s, waiting for %s, working on %.200s",
+                                  w->threadname,
+                                  w->lockwait ? w->lockwait->name :
+                                  w->semawait ? w->semawait->name :
+                                  w->condwait ? w->condwait->name :
+                                  w->joinwait ? w->joinwait->threadname :
+                                  "nothing",
+                                  ATOMIC_GET(&w->exited) ? "exiting" :
+                                  w->working ? w->working : "nothing");
+#ifdef LOCK_OWNER
+               const char *sep = ", locked: ";
+               for (MT_Lock *l = w->mylocks; l && pos < (int) sizeof(buf); l = 
l->nxt) {
+                       pos += snprintf(buf + pos, sizeof(buf) - pos,
+                                       "%s%s(%s)", sep, l->name, l->locker);
+                       sep = ", ";
                }
-               LeaveCriticalSection(&winthread_cs);
+#endif
+               TRC_DEBUG_IF(THRD)
+                       TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) 
sizeof(buf) ? "..." : "");
+               else
+                       fprintf(stderr, "%s%s\n", buf, pos >= (int) sizeof(buf) 
? "..." : "");
        }
+       LeaveCriticalSection(&winthread_cs);
 }
 
 bool
@@ -345,6 +360,42 @@ MT_thread_setcondwait(MT_Cond *cond)
                w->condwait = cond;
 }
 
+#ifdef LOCK_OWNER
+void
+MT_thread_add_mylock(MT_Lock *lock)
+{
+       if (threadslot == TLS_OUT_OF_INDEXES)
+               return;
+       struct winthread *w = TlsGetValue(threadslot);
+
+       if (w) {
+               lock->nxt = w->mylocks;
+               w->mylocks = lock;
+       }
+}
+
+void
+MT_thread_del_mylock(MT_Lock *lock)
+{
+       if (threadslot == TLS_OUT_OF_INDEXES)
+               return;
+       struct winthread *w = TlsGetValue(threadslot);
+
+       if (w) {
+               if (w->mylocks == lock) {
+                       w->mylocks = lock->nxt;
+               } else {
+                       for (MT_Lock *l = w->mylocks; l; l = l->nxt) {
+                               if (l->nxt == lock) {
+                                       l->nxt = lock->nxt;
+                                       break;
+                               }
+                       }
+               }
+       }
+}
+#endif
+
 void
 MT_thread_setworking(const char *work)
 {
@@ -609,6 +660,9 @@ static struct posthread {
        MT_Lock *lockwait;      /* lock we're waiting for */
        MT_Sema *semawait;      /* semaphore we're waiting for */
        MT_Cond *condwait;      /* condition variable we're waiting for */
+#ifdef LOCK_OWNER
+       MT_Lock *mylocks;       /* locks we're holding */
+#endif
        struct posthread *joinwait; /* process we are joining with */
        const char *working;    /* what we're currently doing */
        char algorithm[512];    /* the algorithm used in the last operation */
@@ -634,21 +688,33 @@ static bool thread_initialized = false;
 void
 dump_threads(void)
 {
-       TRC_DEBUG_IF(THRD) {
-               pthread_mutex_lock(&posthread_lock);
-               for (struct posthread *p = posthreads; p; p = p->next) {
-                       TRC_DEBUG_ENDIF(THRD, "%s, waiting for %s, working on 
%.200s\n",
-                                       p->threadname,
-                                       p->lockwait ? p->lockwait->name :
-                                       p->semawait ? p->semawait->name :
-                                       p->condwait ? p->condwait->name :
-                                       p->joinwait ? p->joinwait->threadname :
-                                       "nothing",
-                                       ATOMIC_GET(&p->exited) ? "exiting" :
-                                       p->working ? p->working : "nothing");
+       char buf[1024];
+       pthread_mutex_lock(&posthread_lock);
+       for (struct posthread *p = posthreads; p; p = p->next) {
+               int pos = snprintf(buf, sizeof(buf),
+                                  "%s: waiting for %s, working on %.200s",
+                                  p->threadname,
+                                  p->lockwait ? p->lockwait->name :
+                                  p->semawait ? p->semawait->name :
+                                  p->condwait ? p->condwait->name :
+                                  p->joinwait ? p->joinwait->threadname :
+                                  "nothing",
+                                  ATOMIC_GET(&p->exited) ? "exiting" :
+                                  p->working ? p->working : "nothing");
+#ifdef LOCK_OWNER
+               const char *sep = ", locked: ";
+               for (MT_Lock *l = p->mylocks; l && pos < (int) sizeof(buf); l = 
l->nxt) {
+                       pos += snprintf(buf + pos, sizeof(buf) - pos,
+                                       "%s%s(%s)", sep, l->name, l->locker);
+                       sep = ", ";
                }
-               pthread_mutex_unlock(&posthread_lock);
+#endif
+               TRC_DEBUG_IF(THRD)
+                       TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) 
sizeof(buf) ? "..." : "");
+               else
+                       fprintf(stderr, "%s%s\n", buf, pos >= (int) sizeof(buf) 
? "..." : "");
        }
+       pthread_mutex_unlock(&posthread_lock);
 }
 
 bool
@@ -767,6 +833,42 @@ MT_thread_setcondwait(MT_Cond *cond)
                p->condwait = cond;
 }
 
+#ifdef LOCK_OWNER
+void
+MT_thread_add_mylock(MT_Lock *lock)
+{
+       if (!thread_initialized)
+               return;
+       struct posthread *p = pthread_getspecific(threadkey);
+
+       if (p) {
+               lock->nxt = p->mylocks;
+               p->mylocks = lock;
+       }
+}
+
+void
+MT_thread_del_mylock(MT_Lock *lock)
+{
+       if (!thread_initialized)
+               return;
+       struct posthread *p = pthread_getspecific(threadkey);
+
+       if (p) {
+               if (p->mylocks == lock) {
+                       p->mylocks = lock->nxt;
+               } else {
+                       for (MT_Lock *l = p->mylocks; l; l = l->nxt) {
+                               if (l->nxt == lock) {
+                                       l->nxt = lock->nxt;
+                                       break;
+                               }
+                       }
+               }
+       }
+}
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to