Changeset: 12047d4578bf for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/12047d4578bf Modified Files: sql/storage/store.c Branch: default Log Message:
merged diffs (truncated from 794 to 300 lines): diff --git a/gdk/ChangeLog.Sep2022 b/gdk/ChangeLog.Sep2022 --- a/gdk/ChangeLog.Sep2022 +++ b/gdk/ChangeLog.Sep2022 @@ -1,6 +1,11 @@ # ChangeLog file for GDK # This file is updated with Maddlog +* Mon Jan 16 2023 Sjoerd Mullender <sjo...@acm.org> +- Fixed a race condition that could lead to a bat being added to the SQL + catalog but nog being made persistent, causing a subsequent restart + of the system to fail (and crash). + * Wed Dec 14 2022 Sjoerd Mullender <sjo...@acm.org> - Fixed a race condition where a hash could have been created on a bat using the old bat count while in another thread the bat count diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -97,17 +97,8 @@ typedef enum {LOG_OK, LOG_EOF, LOG_ERR} static gdk_return bm_commit(logger *lg); static gdk_return tr_grow(trans *tr); -static inline void -log_lock(logger *lg) -{ - MT_lock_set(&lg->lock); -} - -static inline void -log_unlock(logger *lg) -{ - MT_lock_unset(&lg->lock); -} +#define log_lock(lg) MT_lock_set(&(lg)->lock) +#define log_unlock(lg) MT_lock_unset(&(lg)->lock) static inline bte find_type(logger *lg, int tpe) @@ -1615,11 +1606,12 @@ cleanup_and_swap(logger *lg, int *r, con return rcnt; } +/* this function is called with log_lock() held; it releases the lock + * before returning */ static gdk_return bm_subcommit(logger *lg) { BUN p, q; - log_lock(lg); BAT *catalog_bid = lg->catalog_bid; BAT *catalog_id = lg->catalog_id; BAT *dcatalog = lg->dcatalog; @@ -1936,6 +1928,8 @@ log_load(int debug, const char *fn, cons BBPretain(lg->catalog_id->batCacheid); BBPretain(lg->dcatalog->batCacheid); + log_lock(lg); + /* bm_subcommit releases the lock */ if (bm_subcommit(lg) != GDK_SUCCEED) { /* cannot commit catalog, so remove log */ MT_remove(filename); @@ -3081,7 +3075,7 @@ bm_commit(logger *lg) fprintf(stderr, "#bm_commit: create %d (%d)\n", bid, BBP_lrefs(bid)); } - log_unlock(lg); + /* bm_subcommit releases the lock */ return bm_subcommit(lg); } diff --git a/gdk/gdk_select.c b/gdk/gdk_select.c --- a/gdk/gdk_select.c +++ b/gdk/gdk_select.c @@ -1722,7 +1722,7 @@ BATselect(BAT *b, BAT *s, const void *tl if ((oidxh = b->torderidx) != NULL) HEAPincref(oidxh); MT_lock_unset(&b->batIdxLock); - if (oidxh == NULL && pb) { + if (oidxh == NULL && pb != NULL) { (void) BATcheckorderidx(pb); MT_lock_set(&pb->batIdxLock); if ((oidxh = pb->torderidx) != NULL) { @@ -1944,7 +1944,7 @@ BATselect(BAT *b, BAT *s, const void *tl } else if (b->thash->nunique == bi.count) estimate = 1; } - if (estimate == BUN_NONE && (bi.key || (pb != NULL && pb->tkey))) { + if (estimate == BUN_NONE && (bi.key || (pb != NULL && pbi.key))) { /* exact result size in special cases */ if (equi) { estimate = 1; @@ -2051,9 +2051,7 @@ BATselect(BAT *b, BAT *s, const void *tl if (!equi && /* DISABLES CODE */ (0) && imprintable(bi.type) && (!bi.transient || - (parent != 0 && - pb != NULL && - !pbi.transient)) && + (pb != NULL && !pbi.transient)) && BATimprints(b) == GDK_SUCCEED) { if (pb != NULL) { MT_lock_set(&pb->batIdxLock); diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c --- a/gdk/gdk_system.c +++ b/gdk/gdk_system.c @@ -191,6 +191,9 @@ static struct winthread { MT_Lock *lockwait; /* lock we're waiting for */ MT_Sema *semawait; /* semaphore we're waiting for */ MT_Cond *condwait; /* condition variable we're waiting for */ +#ifdef LOCK_OWNER + MT_Lock *mylocks; /* locks we're holding */ +#endif struct winthread *joinwait; /* process we are joining with */ const char *working; /* what we're currently doing */ char algorithm[512]; /* the algorithm used in the last operation */ @@ -211,21 +214,33 @@ static DWORD threadslot = TLS_OUT_OF_IND void dump_threads(void) { - TRC_DEBUG_IF(THRD) { - EnterCriticalSection(&winthread_cs); - for (struct winthread *w = winthreads; w; w = w->next) { - TRC_DEBUG_ENDIF(THRD, "%s, waiting for %s, working on %.200s\n", - w->threadname, - w->lockwait ? w->lockwait->name : - w->semawait ? w->semawait->name : - w->condwait ? w->condwait->name : - w->joinwait ? w->joinwait->threadname : - "nothing", - ATOMIC_GET(&w->exited) ? "exiting" : - w->working ? w->working : "nothing"); + char buf[1024]; + EnterCriticalSection(&winthread_cs); + for (struct winthread *w = winthreads; w; w = w->next) { + int pos = snprintf(buf, sizeof(buf), + "%s, waiting for %s, working on %.200s", + w->threadname, + w->lockwait ? w->lockwait->name : + w->semawait ? w->semawait->name : + w->condwait ? w->condwait->name : + w->joinwait ? w->joinwait->threadname : + "nothing", + ATOMIC_GET(&w->exited) ? "exiting" : + w->working ? w->working : "nothing"); +#ifdef LOCK_OWNER + const char *sep = ", locked: "; + for (MT_Lock *l = w->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) { + pos += snprintf(buf + pos, sizeof(buf) - pos, + "%s%s(%s)", sep, l->name, l->locker); + sep = ", "; } - LeaveCriticalSection(&winthread_cs); +#endif + TRC_DEBUG_IF(THRD) + TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : ""); + else + fprintf(stderr, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : ""); } + LeaveCriticalSection(&winthread_cs); } bool @@ -345,6 +360,42 @@ MT_thread_setcondwait(MT_Cond *cond) w->condwait = cond; } +#ifdef LOCK_OWNER +void +MT_thread_add_mylock(MT_Lock *lock) +{ + if (threadslot == TLS_OUT_OF_INDEXES) + return; + struct winthread *w = TlsGetValue(threadslot); + + if (w) { + lock->nxt = w->mylocks; + w->mylocks = lock; + } +} + +void +MT_thread_del_mylock(MT_Lock *lock) +{ + if (threadslot == TLS_OUT_OF_INDEXES) + return; + struct winthread *w = TlsGetValue(threadslot); + + if (w) { + if (w->mylocks == lock) { + w->mylocks = lock->nxt; + } else { + for (MT_Lock *l = w->mylocks; l; l = l->nxt) { + if (l->nxt == lock) { + l->nxt = lock->nxt; + break; + } + } + } + } +} +#endif + void MT_thread_setworking(const char *work) { @@ -609,6 +660,9 @@ static struct posthread { MT_Lock *lockwait; /* lock we're waiting for */ MT_Sema *semawait; /* semaphore we're waiting for */ MT_Cond *condwait; /* condition variable we're waiting for */ +#ifdef LOCK_OWNER + MT_Lock *mylocks; /* locks we're holding */ +#endif struct posthread *joinwait; /* process we are joining with */ const char *working; /* what we're currently doing */ char algorithm[512]; /* the algorithm used in the last operation */ @@ -634,21 +688,33 @@ static bool thread_initialized = false; void dump_threads(void) { - TRC_DEBUG_IF(THRD) { - pthread_mutex_lock(&posthread_lock); - for (struct posthread *p = posthreads; p; p = p->next) { - TRC_DEBUG_ENDIF(THRD, "%s, waiting for %s, working on %.200s\n", - p->threadname, - p->lockwait ? p->lockwait->name : - p->semawait ? p->semawait->name : - p->condwait ? p->condwait->name : - p->joinwait ? p->joinwait->threadname : - "nothing", - ATOMIC_GET(&p->exited) ? "exiting" : - p->working ? p->working : "nothing"); + char buf[1024]; + pthread_mutex_lock(&posthread_lock); + for (struct posthread *p = posthreads; p; p = p->next) { + int pos = snprintf(buf, sizeof(buf), + "%s: waiting for %s, working on %.200s", + p->threadname, + p->lockwait ? p->lockwait->name : + p->semawait ? p->semawait->name : + p->condwait ? p->condwait->name : + p->joinwait ? p->joinwait->threadname : + "nothing", + ATOMIC_GET(&p->exited) ? "exiting" : + p->working ? p->working : "nothing"); +#ifdef LOCK_OWNER + const char *sep = ", locked: "; + for (MT_Lock *l = p->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) { + pos += snprintf(buf + pos, sizeof(buf) - pos, + "%s%s(%s)", sep, l->name, l->locker); + sep = ", "; } - pthread_mutex_unlock(&posthread_lock); +#endif + TRC_DEBUG_IF(THRD) + TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : ""); + else + fprintf(stderr, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : ""); } + pthread_mutex_unlock(&posthread_lock); } bool @@ -767,6 +833,42 @@ MT_thread_setcondwait(MT_Cond *cond) p->condwait = cond; } +#ifdef LOCK_OWNER +void +MT_thread_add_mylock(MT_Lock *lock) +{ + if (!thread_initialized) + return; + struct posthread *p = pthread_getspecific(threadkey); + + if (p) { + lock->nxt = p->mylocks; + p->mylocks = lock; + } +} + +void +MT_thread_del_mylock(MT_Lock *lock) +{ + if (!thread_initialized) + return; + struct posthread *p = pthread_getspecific(threadkey); + + if (p) { + if (p->mylocks == lock) { + p->mylocks = lock->nxt; + } else { + for (MT_Lock *l = p->mylocks; l; l = l->nxt) { + if (l->nxt == lock) { + l->nxt = lock->nxt; + break; + } + } + } + } +} _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org