Changeset: 847165695e19 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=847165695e19
Modified Files:
        sql/storage/store.c
Branch: default
Log Message:

Refactor the store_manager() main loop

It now takes bs_lock before the loop starts and only releases it around call to
sleep and for fatal errors.

Also, with the addition of the .enabled flag which can be cleared to
temporarily suspend log merging, the number of states it can be in has grown so
all decision making is now in a single function.

All state variables have been gathered in a struct. This has no functional
consequences but makes it easier to see at a glance which state is involved.

Special care must be taken around flush_now (formerly, need_flush) because it
can be set without first obtaining bs_lock.

To help debugging, the new code can log clear messages about why it is or isn't
flushing the log.

        #store flusher not flushing, reason to flush: timer expired, reason not 
to: awaiting idle time


diffs (219 lines):

diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -213,6 +213,11 @@ trans_drop_tmp(sql_trans *tr)
 }
 
 /*#define STORE_DEBUG 1*/
+/*#define STORE_FLUSHER_DEBUG 1 */
+
+#ifdef STORE_DEBUG
+#define STORE_FLUSHER_DEBUG 1
+#endif
 
 sql_trans *
 sql_trans_destroy(sql_trans *t)
@@ -2072,8 +2077,92 @@ store_vacuum( sql_trans *tr )
        return 0;
 }
 
-static bool logging = false;
-static ATOMIC_TYPE need_flush = ATOMIC_VAR_INIT(0);
+// All this must only be accessed while holding the bs_lock.
+// The exception is flush_now, which can be set by anyone at any
+// time and therefore needs some special treatment.
+static struct {
+       // These two are inputs, set from outside the store_manager
+       bool enabled;
+       ATOMIC_TYPE flush_now;
+       // These are state set from within the store_manager
+       bool working;
+       int countdown_ms;
+       unsigned int cycle;
+       char *reason_to;
+       char *reason_not_to;
+} flusher = {
+       .flush_now = ATOMIC_VAR_INIT(0),
+       .enabled = true,
+};
+
+static void
+flusher_new_cycle(void)
+{
+       int cycle_time = GDKdebug & FORCEMITOMASK ? 500 : 50000;
+
+       // do not touch .enabled and .flush_now, those are inputs
+       flusher.working = false;
+       flusher.countdown_ms = cycle_time;
+       flusher.cycle += 1;
+       flusher.reason_to = NULL;
+       flusher.reason_not_to = NULL;
+}
+
+/* Determine whether this is a good moment to flush the log.
+ * Note: this function clears flusher.flush_now if it was set,
+ * so if it returns true you must either flush the log or 
+ * set flush_log to true again, otherwise the request will
+ * be lost.
+ *
+ * This is done this way because flush_now can be set at any time
+ * without first obtaining bs_lock. To avoid time-of-check-to-time-of-use
+ * issues, this function both checks and clears the flag.
+ */
+static bool
+flusher_should_run(void)
+{
+       // We will flush if we have a reason to and no reason not to.
+       char *reason_to = NULL, *reason_not_to = NULL;
+
+       if (flusher.countdown_ms <= 0)
+               reason_to = "timer expired";
+
+       int many_changes = GDKdebug & FORCEMITOMASK ? 100 : 1000000;
+       if (logger_funcs.changes() >= many_changes)
+               reason_to = "many changes";
+
+       // Read and clear flush_now. If we decide not to flush
+       // we'll put it back.
+       ATOMIC_TYPE my_flush_now = ATOMIC_XCG(&flusher.flush_now, 0);
+       if (my_flush_now)
+               reason_to = "user request";
+
+       if (ATOMIC_GET(&store_nr_active) > 0)
+               reason_not_to = "awaiting idle time";
+
+       if (!flusher.enabled)
+               reason_not_to = "disabled";
+
+       bool do_it = (reason_to && !reason_not_to);
+
+#ifdef STORE_FLUSHER_DEBUG
+       if (reason_to != flusher.reason_to || reason_not_to != 
flusher.reason_not_to) {
+               fprintf(stderr, "#store flusher %s, reason to flush: %s, reason 
not to: %s\n",
+                       do_it ? "flushing" : "not flushing",
+                       reason_to ? reason_to : "none",
+                       reason_not_to ? reason_not_to : "none"
+               );
+       }
+#endif
+       flusher.reason_to = reason_to;
+       flusher.reason_not_to = reason_not_to;
+
+       // Remember the request for next time.
+       if (!do_it && my_flush_now)
+               ATOMIC_SET(&flusher.flush_now, 1);
+
+       return do_it;
+}
 
 void
 store_exit(void)
@@ -2084,7 +2173,7 @@ store_exit(void)
        fprintf(stderr, "#store exit locked\n");
 #endif
        /* busy wait till the logmanager is ready */
-       while (logging) {
+       while (flusher.working) {
                MT_lock_unset(&bs_lock);
                MT_sleep_ms(100);
                MT_lock_set(&bs_lock);
@@ -2122,7 +2211,7 @@ store_apply_deltas(bool locked)
 {
        int res = LOG_OK;
 
-       logging = true;
+       flusher.working = true;
        /* make sure we reset all transactions on re-activation */
        gtrans->wstime = timestamp();
        if (store_funcs.gtrans_update)
@@ -2135,7 +2224,7 @@ store_apply_deltas(bool locked)
                if (!locked)
                        MT_lock_set(&bs_lock);
        }
-       logging = false;
+       flusher.working = false;
 
        return res;
 }
@@ -2143,55 +2232,46 @@ store_apply_deltas(bool locked)
 void
 store_flush_log(void)
 {
-       ATOMIC_SET(&need_flush, 1);
+       ATOMIC_SET(&flusher.flush_now, 1);
 }
 
 void
 store_manager(void)
 {
-       const int sleeptime = GDKdebug & FORCEMITOMASK ? 10 : 50;
-       const int timeout = GDKdebug & FORCEMITOMASK ? 500 : 50000;
-       const int changes = GDKdebug & FORCEMITOMASK ? 100 : 1000000;
-
        MT_thread_setworking("sleeping");
+
+       // In the main loop we always hold the lock except when sleeping
+       MT_lock_set(&bs_lock);
+
        while (!GDKexiting()) {
-               int res = LOG_OK;
-               int t;
-
-               for (t = timeout; t > 0 && !ATOMIC_GET(&need_flush); t -= 
sleeptime) {
+               int res;
+
+               if (!flusher_should_run()) {
+                       const int sleeptime = 100;
+                       MT_lock_unset(&bs_lock);
                        MT_sleep_ms(sleeptime);
-                       if (GDKexiting())
-                               return;
-               }
-
-               MT_lock_set(&bs_lock);
-               if (GDKexiting()) {
-                       MT_lock_unset(&bs_lock);
-                       return;
-               }
-               if (!ATOMIC_GET(&need_flush) && logger_funcs.changes() < 
changes) {
-                       MT_lock_unset(&bs_lock);
+                       flusher.countdown_ms -= sleeptime;
+                       MT_lock_set(&bs_lock);
                        continue;
                }
-               ATOMIC_SET(&need_flush, 1);
-               while (ATOMIC_GET(&store_nr_active)) { /* find a moment to 
flush */
-                       MT_lock_unset(&bs_lock);
-                       if (GDKexiting())
-                               return;
-                       MT_sleep_ms(sleeptime);
-                       MT_lock_set(&bs_lock);
-               }
-               ATOMIC_SET(&need_flush, 0);
 
                MT_thread_setworking("flushing");
-               store_apply_deltas(false);
-
-               MT_lock_unset(&bs_lock);
-
-               if (res != LOG_OK)
+               res = store_apply_deltas(false);
+
+               if (res != LOG_OK) {
+                       MT_lock_unset(&bs_lock);
                        GDKfatal("write-ahead logging failure, disk full?");
+               }
+
+               flusher_new_cycle();
                MT_thread_setworking("sleeping");
-       }
+#ifdef STORE_FLUSHER_DEBUG
+               fprintf(stderr, "#store flusher done\n");
+#endif
+       }
+
+       // End of loop, end of lock
+       MT_lock_unset(&bs_lock);
 }
 
 void
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to