This is an automated email from the ASF dual-hosted git repository.

mgoulish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 5be2067  log.c rewrite part two DISPATCH-2133: qd_log_enabled() race 
This closes #1366
5be2067 is described below

commit 5be2067c1dc28378e56acd5806f25159cf3a14a5
Author: mgoulish <mgoul...@redhat.com>
AuthorDate: Fri Sep 24 12:49:50 2021 -0400

    log.c rewrite part two
    DISPATCH-2133: qd_log_enabled() race
    This closes #1366
---
 src/log.c       | 259 +++++++++++++++++++++++++++++++-------------------------
 tests/tsan.supp |   2 +-
 2 files changed, 146 insertions(+), 115 deletions(-)

diff --git a/src/log.c b/src/log.c
index e032bf6..5599483 100644
--- a/src/log.c
+++ b/src/log.c
@@ -26,6 +26,7 @@
 #include "entity_cache.h"
 #include "log_private.h"
 #include "server_private.h"
+#include "schema_enum.h"
 
 #include "qpid/dispatch/alloc.h"
 #include "qpid/dispatch/atomic.h"
@@ -42,6 +43,35 @@
 #define LOG_MAX (QD_LOG_TEXT_MAX+128)
 #define LIST_MAX 1000
 
+// log.c lock strategy ========================================
+//
+// log sources ----------------------
+//     1. Log sources are created only at initialize time,
+//        and are freed only during finalize time, so the
+//        list itself does not need to be protected by a
+//        lock.
+//
+//     2. Individual log sources do need protection, though,
+//        because a management command may call qd_log_entity()
+//        at any time, which may replace the log sink. So each
+//        log source has its own lock, to prevent collisions
+//        between write_log() and qd_log_entity().
+//
+//  log sinks -----------------------
+//     1. There is a global list of log sinks, which may be
+//        added to and deleted from at any time by qd_log_entity().
+//        So there is a lock to protect the sinks list from
+//        simultaneous additions and deletions.
+//
+//   log entries ---------------------
+//     1. There is a global list of the most recent log entries
+//        that may be added to at any time by any log source.
+//        The list is bounded, so after some point additions
+//        cause deletions.
+//        So there is another lock to protect this entries lis
+//        from simultaneous access.
+//
+//=============================================================
 const char *QD_LOG_STATS_TYPE = "logStats";
 
 static qd_log_source_t      *default_log_source=0;
@@ -49,7 +79,6 @@ static qd_log_source_t      *default_log_source=0;
 int qd_log_max_len() { return TEXT_MAX; }
 
 typedef struct qd_log_entry_t qd_log_entry_t;
-
 struct qd_log_entry_t {
     DEQ_LINKS(qd_log_entry_t);
     char           *module;
@@ -59,14 +88,13 @@ struct qd_log_entry_t {
     struct timeval  time;
     char            text[TEXT_MAX];
 };
-
 ALLOC_DECLARE(qd_log_entry_t);
 ALLOC_DEFINE(qd_log_entry_t);
-
 DEQ_DECLARE(qd_log_entry_t, qd_log_list_t);
 static qd_log_list_t         entries = {0};
+sys_mutex_t *entries_lock = 0;
 
-static void qd_log_entry_free_lh(qd_log_entry_t* entry) {
+static void qd_log_entry_free_lh(qd_log_entry_t *entry) {
     DEQ_REMOVE(entries, entry);
     free(entry->file);
     free(entry->module);
@@ -81,11 +109,9 @@ typedef struct log_sink_t {
     FILE *file;
     DEQ_LINKS(struct log_sink_t);
 } log_sink_t;
-
-DEQ_DECLARE(log_sink_t, log_sink_list_t);
-
-static sys_mutex_t *log_sink_list_lock = 0;
-static log_sink_list_t sink_list = {0};
+DEQ_DECLARE(log_sink_t, log_sinks_t);
+static sys_mutex_t *log_sinks_lock = 0;
+static log_sinks_t sink_list = {0};
 
 const char *format = "%Y-%m-%d %H:%M:%S.%%06lu %z";
 bool utc = false;
@@ -111,9 +137,11 @@ static const char* SINK_STDERR = "stderr";
 static const char* SINK_SYSLOG = "syslog";
 static const char* SOURCE_DEFAULT = "DEFAULT";
 
+// Hold the log_sinks_lock to prevent collision
+// with log_sink().
 static void log_sink_decref(const log_sink_t *sink) {
     if (!sink) return;
-    sys_mutex_lock(log_sink_list_lock);
+    sys_mutex_lock(log_sinks_lock);
     assert(sink->ref_count);
 
     log_sink_t *mutable_sink = (log_sink_t *)sink;
@@ -127,12 +155,14 @@ static void log_sink_decref(const log_sink_t *sink) {
             closelog();
         free(mutable_sink);
     }
-    sys_mutex_unlock(log_sink_list_lock);
+    sys_mutex_unlock(log_sinks_lock);
 }
 
-static const log_sink_t* log_sink(const char* name) {
-    sys_mutex_lock(log_sink_list_lock);
-    log_sink_t* sink = DEQ_HEAD(sink_list);
+// Hold the log_sinks_lock to prevent collision
+// with log_sink_decref().
+static const log_sink_t *log_sink(const char *name) {
+    sys_mutex_lock(log_sinks_lock);
+    log_sink_t *sink = DEQ_HEAD(sink_list);
     DEQ_FIND(sink, strcmp(sink->name, name) == 0);
 
     if (sink) {
@@ -156,11 +186,8 @@ static const log_sink_t* log_sink(const char* name) {
             file = fopen(name, "a");
         }
 
-        //If file is not there, return 0.
-        // We are not logging an error here since we are already holding the 
log_source_lock
-        // Writing a log message will try to re-obtain the log_source_lock 
lock and cause a deadlock.
         if (!file && !syslog) {
-            sys_mutex_unlock(log_sink_list_lock);
+            sys_mutex_unlock(log_sinks_lock);
             return 0;
         }
 
@@ -173,7 +200,7 @@ static const log_sink_t* log_sink(const char* name) {
         DEQ_INSERT_TAIL(sink_list, sink);
 
     }
-    sys_mutex_unlock(log_sink_list_lock);
+    sys_mutex_unlock(log_sinks_lock);
     return (const log_sink_t *)sink;
 }
 
@@ -193,16 +220,13 @@ struct qd_log_source_t {
     bool syslog;
     const log_sink_t *sink;
     uint64_t severity_histogram[N_LEVEL_INDICES];
+    sys_mutex_t  *lock;
 };
-
 DEQ_DECLARE(qd_log_source_t, qd_log_source_list_t);
-
-static sys_mutex_t          *log_source_lock = 0;
 static qd_log_source_list_t  source_list = {0};
 
-
 typedef struct level_t {
-    const char* name;
+    const char *name;
     int bit;     // QD_LOG bit
     int mask;    // Bit or higher
     const int syslog;
@@ -229,7 +253,7 @@ static const level_t invalid_level = {"invalid", -2, -2, 0};
 static char level_names[TEXT_MAX] = {0}; /* Set up in qd_log_initialize */
 
 /// Return NULL and set qd_error if not a valid bit.
-static const level_t* level_for_bit(int bit) {
+static const level_t *level_for_bit(int bit) {
     level_index_t i = 0;
     while (i < N_LEVELS && levels[i].bit != bit) ++i;
     if (i == N_LEVELS) {
@@ -239,7 +263,7 @@ static const level_t* level_for_bit(int bit) {
 }
 
 /// Return NULL and set qd_error if not a valid level.
-static const level_t* level_for_name(const char *name, int len) {
+static const level_t *level_for_name(const char *name, int len) {
     level_index_t i = 0;
     while (i < N_LEVELS && strncasecmp(levels[i].name, name, len) != 0) ++i;
     if (i == N_LEVELS) {
@@ -265,7 +289,7 @@ static int level_index_for_bit(int bit) {
 }
 
 /// Return the name of log level or 0 if not found.
-static const char* level_name(int level) {
+static const char *level_name(int level) {
     return (0 <= level && level < N_LEVELS) ? levels[level].name : NULL;
 }
 
@@ -283,18 +307,19 @@ static int enable_mask(const char *enable_) {
     {
         int len = strlen(token);
         int plus = (len > 0 && token[len-1] == '+') ? 1 : 0;
-        const level_t* level = level_for_name(token, len-plus);
+        const level_t *level = level_for_name(token, len-plus);
         mask |= (plus ? level->mask : level->bit);
     }
     free(enable);
     return mask;
 }
 
-/// Caller must hold log_source_lock
-static qd_log_source_t* lookup_log_source_lh(const char *module)
+static qd_log_source_t *lookup_log_source(const char *module)
 {
-    if (strcasecmp(module, SOURCE_DEFAULT) == 0)
+    if (strcasecmp(module, SOURCE_DEFAULT) == 0) {
         return default_log_source;
+    }
+
     qd_log_source_t *src = DEQ_HEAD(source_list);
     DEQ_FIND(src, strcasecmp(module, src->module) == 0);
     return src;
@@ -307,10 +332,10 @@ static bool default_bool(int value, int default_value) {
 static void write_log(qd_log_source_t *log_source, qd_log_entry_t *entry)
 {
     // Don't let the sink list change while we are writing to one of them.
-    sys_mutex_lock(log_sink_list_lock);
-    const log_sink_t* sink = log_source->sink ? log_source->sink : 
default_log_source->sink;
+    sys_mutex_lock(log_source->lock);
+    const log_sink_t *sink = log_source->sink ? log_source->sink : 
default_log_source->sink;
     if (!sink) {
-        sys_mutex_unlock(log_sink_list_lock);
+        sys_mutex_unlock(log_source->lock);
         return;
     }
 
@@ -353,56 +378,42 @@ static void write_log(qd_log_source_t *log_source, 
qd_log_entry_t *entry)
         if (syslog_level != -1)
             syslog(syslog_level, "%s", log_str);
     }
-    sys_mutex_unlock(log_sink_list_lock);
+    sys_mutex_unlock(log_source->lock);
 }
 
 /// Reset the log source to the default state
-static void qd_log_source_defaults(qd_log_source_t *log_source) {
-    log_source->mask = -1;
-    log_source->includeTimestamp = -1;
-    log_source->includeSource = -1;
-    log_source->sink = 0;
-    memset ( log_source->severity_histogram, 0, sizeof(uint64_t) * 
(N_LEVEL_INDICES) );
-}
-
-/// Caller must hold the log_source_lock
-static qd_log_source_t *qd_log_source_lh(const char *module)
-{
-    qd_log_source_t *log_source = lookup_log_source_lh(module);
-    if (!log_source)
-    {
-        log_source = NEW(qd_log_source_t);
-        ZERO(log_source);
-        log_source->module = (char*) malloc(strlen(module) + 1);
-        strcpy(log_source->module, module);
-        qd_log_source_defaults(log_source);
-        DEQ_INSERT_TAIL(source_list, log_source);
-        qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source);
-    }
-    return log_source;
+static void qd_log_source_defaults(qd_log_source_t *src) {
+    src->mask = -1;
+    src->includeTimestamp = -1;
+    src->includeSource = -1;
+    log_sink_decref(src->sink);
+    src->sink = 0;
+    memset ( src->severity_histogram, 0, sizeof(uint64_t) * (N_LEVEL_INDICES) 
);
 }
 
 qd_log_source_t *qd_log_source(const char *module)
 {
-    sys_mutex_lock(log_source_lock);
-    qd_log_source_t* src = qd_log_source_lh(module);
-    sys_mutex_unlock(log_source_lock);
+    qd_log_source_t *src = lookup_log_source(module);
     return src;
 }
 
+// This is called by management thread, and alters the
+// log sink. Take lock to avoid collision with worker threads.
 qd_log_source_t *qd_log_source_reset(const char *module)
 {
-    sys_mutex_lock(log_source_lock);
-    qd_log_source_t* src = qd_log_source_lh(module);
+    qd_log_source_t *src = qd_log_source(module);
+    sys_mutex_lock(src->lock);
     qd_log_source_defaults(src);
-    sys_mutex_unlock(log_source_lock);
+    sys_mutex_unlock(src->lock);
     return src;
 }
 
-static void qd_log_source_free_lh(qd_log_source_t* src) {
+// This is called only during finalize, which does not hold locks.
+static void qd_log_source_free(qd_log_source_t *src) {
     DEQ_REMOVE(source_list, src);
     log_sink_decref(src->sink);
     free(src->module);
+    free(src->lock);
     free(src);
 }
 
@@ -414,17 +425,18 @@ bool qd_log_enabled(qd_log_source_t *source, 
qd_log_level_t level) {
 
 void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, bool 
check_level, const char *file, int line, const char *fmt, va_list ap)
 {
-    /*-----------------------------------------------
-      Count this log-event in this log's histogram
-      whether or not this log is currently enabled.
-      We can always decide not to look at it later,
-      based on its used/unused status.
-    -----------------------------------------------*/
+    // Count this log-event in this log's histogram
+    // whether or not this log is currently enabled.
+    // We can always decide not to look at it later,
+    // based on its used/unused status.
     int level_index = level_index_for_bit(level);
     if (level_index < 0)
         qd_error_clear();
-    else
+    else {
+        sys_mutex_lock(source->lock);
         source->severity_histogram[level_index]++;
+        sys_mutex_unlock(source->lock);
+    }
 
     if (check_level) {
         if (!qd_log_enabled(source, level))
@@ -435,12 +447,7 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t 
level, bool check_leve
     qd_log_entry_t *entry = new_qd_log_entry_t();
     DEQ_ITEM_INIT(entry);
 
-    //
-    // Obtain the log_source_lock global lock. We need to do this, if not, the 
qd_log_entity() function
-    // could free the log_source->sink from underneath you and replace it with 
a new sink.
-    // Once we obtain this lock, we only release the lock once the log line is 
written to the sink.
-    //
-    sys_mutex_lock(log_source_lock);
+    sys_mutex_lock(entries_lock);
     entry->module = source->module ? strdup(source->module) : 0;
     entry->level  = level;
     entry->file   = file ? strdup(file) : 0;
@@ -451,7 +458,7 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t 
level, bool check_leve
     DEQ_INSERT_TAIL(entries, entry);
     if (DEQ_SIZE(entries) > LIST_MAX)
         qd_log_entry_free_lh(DEQ_HEAD(entries));
-    sys_mutex_unlock(log_source_lock);
+    sys_mutex_unlock(entries_lock);
 }
 
 void qd_log_impl_v1(qd_log_source_t *source, qd_log_level_t level,  const char 
*file, int line, const char *fmt, ...)
@@ -486,7 +493,7 @@ PyObject *qd_log_recent_py(long limit) {
         int i = 0;
         // NOTE: PyList_SetItem steals a reference so no leak here.
         PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->module));
-        const char* level = level_name( level_index_for_bit(entry->level) + 2 
);
+        const char *level = level_name( level_index_for_bit(entry->level) + 2 
);
         PyList_SetItem(py_entry, i++, level ? PyUnicode_FromString(level) : 
inc_none());
         PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->text));
         PyList_SetItem(py_entry, i++, entry->file ? 
PyUnicode_FromString(entry->file) : inc_none());
@@ -506,13 +513,39 @@ PyObject *qd_log_recent_py(long limit) {
     return NULL;
 }
 
+static void _add_log_source (const char *module_name) {
+    qd_log_source_t *log_source;
+    log_source = NEW(qd_log_source_t);
+    ZERO(log_source);
+    log_source->module = qd_strdup(module_name);
+    qd_log_source_defaults(log_source);
+    log_source->lock = sys_mutex();
+    DEQ_INSERT_TAIL(source_list, log_source);
+    qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source);
+
+    if (!strcmp(SOURCE_DEFAULT, module_name)) {
+        default_log_source = log_source;
+    }
+}
+
 void qd_log_initialize(void)
 {
     DEQ_INIT(entries);
     DEQ_INIT(source_list);
     DEQ_INIT(sink_list);
 
-    log_sink_list_lock = sys_mutex();
+    int name_offset = strlen("QD_SCHEMA_LOG_MODULE_");
+
+    int i  ;
+    for (i = 0; i < QD_SCHEMA_LOG_MODULE_ENUM_COUNT; ++ i)
+    {
+        const char *module_name = qd_schema_log_module_names[i] + name_offset;
+        _add_log_source(module_name);
+    }
+    _add_log_source("MAIN");
+    _add_log_source("DISPLAYNAME");
+
+    log_sinks_lock = sys_mutex();
 
     // Set up level_names for use in error messages.
     char *begin = level_names, *end = level_names+sizeof(level_names);
@@ -520,9 +553,8 @@ void qd_log_initialize(void)
     for (level_index_t i = NONE + 1; i < N_LEVELS; ++i)
         aprintf(&begin, end, ", %s", levels[i].name);
 
-    log_source_lock = sys_mutex();
+    entries_lock = sys_mutex();
 
-    default_log_source = qd_log_source(SOURCE_DEFAULT);
     default_log_source->mask = levels[INFO].mask;
     default_log_source->includeTimestamp = true;
     default_log_source->includeSource = 0;
@@ -532,7 +564,7 @@ void qd_log_initialize(void)
 
 void qd_log_finalize(void) {
     while (DEQ_HEAD(source_list))
-        qd_log_source_free_lh(DEQ_HEAD(source_list));
+        qd_log_source_free(DEQ_HEAD(source_list));
     while (DEQ_HEAD(entries))
         qd_log_entry_free_lh(DEQ_HEAD(entries));
     while (DEQ_HEAD(sink_list))
@@ -540,11 +572,20 @@ void qd_log_finalize(void) {
     default_log_source = NULL;  // stale value would misconfigure new router 
started again in the same process
 }
 
+// This is the entry point for management commands that
+// may arrive at any time and change the sink in a log
+// source.
+// If we happen to be writing to the soon-to-be-former
+// log sink when it is deleted, a paradox will be generated
+// that could destroy the entire space-time continuum in
+// which this code is being executed.
+// Thus the locks in each log source.
+//
 qd_error_t qd_log_entity(qd_entity_t *entity)
 {
     qd_error_clear();
 
-    char* module = 0;
+    char *module = 0;
     char *outputFile = 0;
     char *enable = 0;
     int include_timestamp = 0;
@@ -572,12 +613,6 @@ qd_error_t qd_log_entity(qd_entity_t *entity)
         //
         QD_ERROR_BREAK();
 
-        //
-        // Obtain all attributes from the entity before obtaining the 
log_source_lock.
-        // We do this because functions like qd_entity_get_string and 
qd_entity_get_bool ultimately call qd_vlog_impl() which
-        // also holds the log_source_lock when calling write_log().
-        //
-
         if (qd_entity_has(entity, "outputFile")) {
             has_output_file = true;
             outputFile = qd_entity_get_string(entity, "outputFile");
@@ -602,33 +637,28 @@ qd_error_t qd_log_entity(qd_entity_t *entity)
             QD_ERROR_BREAK();
         }
 
-        //
-        // Obtain the log_source_lock lock. This lock is also used when 
write_log() is called.
-        //
-        sys_mutex_lock(log_source_lock);
+        qd_log_source_t *log_source = qd_log_source(module); /* The 
original(already existing) log source */
 
-        qd_log_source_t *src = qd_log_source_lh(module); /* The 
original(already existing) log source */
+        sys_mutex_lock(log_source->lock);
 
         if (has_output_file) {
-            const log_sink_t* sink = log_sink(outputFile);
+            const log_sink_t *sink = log_sink(outputFile);
             if (!sink) {
                 error_in_output = true;
-                sys_mutex_unlock(log_source_lock);
+                sys_mutex_unlock(log_source->lock);
                 break;
             }
 
             // DEFAULT source may already have a sink, so free the old sink 
first
-            if (src->sink) {
-                log_sink_decref(src->sink);
-            }
+            log_sink_decref(log_source->sink);
 
             // Assign the new sink
-            src->sink = sink;
+            log_source->sink = sink;
 
-            if (src->sink->syslog) {
+            if (log_source->sink->syslog) {
                 // Timestamp should be off for syslog.
                 is_sink_syslog = true;
-                src->includeTimestamp = 0;
+                log_source->includeTimestamp = 0;
             }
         }
 
@@ -637,28 +667,28 @@ qd_error_t qd_log_entity(qd_entity_t *entity)
 
             if (mask < -1) {
                 error_in_enable = true;
-                sys_mutex_unlock(log_source_lock);
+                sys_mutex_unlock(log_source->lock);
                 break;
             }
             else {
-                src->mask = mask;
+                log_source->mask = mask;
             }
 
-            if (qd_log_enabled(src, QD_LOG_TRACE)) {
+            if (qd_log_enabled(log_source, QD_LOG_TRACE)) {
                 trace_enabled = true;
             }
         }
 
         if (has_include_timestamp && !is_sink_syslog) {
             // Timestamp should be off for syslog.
-            src->includeTimestamp = include_timestamp;
+            log_source->includeTimestamp = include_timestamp;
         }
 
         if (has_include_source) {
-            src->includeSource = include_source;
+            log_source->includeSource = include_source;
         }
 
-        sys_mutex_unlock(log_source_lock);
+        sys_mutex_unlock(log_source->lock);
 
     } while(0);
 
@@ -679,8 +709,9 @@ qd_error_t qd_log_entity(qd_entity_t *entity)
         free(enable);
 
     //
-    // If trace logging is enabled, loop thru all connections in the router 
and call the pn_transport_set_tracer callback
-    // so proton frame trace can be output as part of the router trace log.
+    // If trace logging is enabled, loop thru all connections in the router and
+    // call the pn_transport_set_tracer callback so proton frame trace can be 
output
+    // as part of the router trace log.
     //
     if (trace_enabled) {
         qd_server_trace_all_connections();
@@ -689,7 +720,7 @@ qd_error_t qd_log_entity(qd_entity_t *entity)
     return qd_error_code();
 }
 
-void qd_format_string(char* buf, int buf_size, const char *fmt, ...)
+void qd_format_string(char *buf, int buf_size, const char *fmt, ...)
 {
     va_list args;
     va_start(args, fmt);
@@ -698,7 +729,7 @@ void qd_format_string(char* buf, int buf_size, const char 
*fmt, ...)
 }
 
 
-qd_error_t qd_entity_refresh_logStats(qd_entity_t* entity, void *impl)
+qd_error_t qd_entity_refresh_logStats(qd_entity_t *entity, void *impl)
 {
     qd_log_source_t *log = (qd_log_source_t*)impl;
     char identity_str[TEXT_MAX];
diff --git a/tests/tsan.supp b/tests/tsan.supp
index 7ab3ca0..417c619 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -50,7 +50,7 @@ race:qdr_record_link_credit
 race:qdr_process_tick_CT
 
 # DISPATCH-2133 (harmless)
-race:qd_log_enabled
+#race:qd_log_enabled
 
 # DISPATCH-2134
 race:qdr_link_process_initial_delivery_CT

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to