This is an automated email from the ASF dual-hosted git repository. mgoulish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 5be2067 log.c rewrite part two DISPATCH-2133: qd_log_enabled() race This closes #1366 5be2067 is described below commit 5be2067c1dc28378e56acd5806f25159cf3a14a5 Author: mgoulish <mgoul...@redhat.com> AuthorDate: Fri Sep 24 12:49:50 2021 -0400 log.c rewrite part two DISPATCH-2133: qd_log_enabled() race This closes #1366 --- src/log.c | 259 +++++++++++++++++++++++++++++++------------------------- tests/tsan.supp | 2 +- 2 files changed, 146 insertions(+), 115 deletions(-) diff --git a/src/log.c b/src/log.c index e032bf6..5599483 100644 --- a/src/log.c +++ b/src/log.c @@ -26,6 +26,7 @@ #include "entity_cache.h" #include "log_private.h" #include "server_private.h" +#include "schema_enum.h" #include "qpid/dispatch/alloc.h" #include "qpid/dispatch/atomic.h" @@ -42,6 +43,35 @@ #define LOG_MAX (QD_LOG_TEXT_MAX+128) #define LIST_MAX 1000 +// log.c lock strategy ======================================== +// +// log sources ---------------------- +// 1. Log sources are created only at initialize time, +// and are freed only during finalize time, so the +// list itself does not need to be protected by a +// lock. +// +// 2. Individual log sources do need protection, though, +// because a management command may call qd_log_entity() +// at any time, which may replace the log sink. So each +// log source has its own lock, to prevent collisions +// between write_log() and qd_log_entity(). +// +// log sinks ----------------------- +// 1. There is a global list of log sinks, which may be +// added to and deleted from at any time by qd_log_entity(). +// So there is a lock to protect the sinks list from +// simultaneous additions and deletions. +// +// log entries --------------------- +// 1. There is a global list of the most recent log entries +// that may be added to at any time by any log source. +// The list is bounded, so after some point additions +// cause deletions. +// So there is another lock to protect this entries lis +// from simultaneous access. +// +//============================================================= const char *QD_LOG_STATS_TYPE = "logStats"; static qd_log_source_t *default_log_source=0; @@ -49,7 +79,6 @@ static qd_log_source_t *default_log_source=0; int qd_log_max_len() { return TEXT_MAX; } typedef struct qd_log_entry_t qd_log_entry_t; - struct qd_log_entry_t { DEQ_LINKS(qd_log_entry_t); char *module; @@ -59,14 +88,13 @@ struct qd_log_entry_t { struct timeval time; char text[TEXT_MAX]; }; - ALLOC_DECLARE(qd_log_entry_t); ALLOC_DEFINE(qd_log_entry_t); - DEQ_DECLARE(qd_log_entry_t, qd_log_list_t); static qd_log_list_t entries = {0}; +sys_mutex_t *entries_lock = 0; -static void qd_log_entry_free_lh(qd_log_entry_t* entry) { +static void qd_log_entry_free_lh(qd_log_entry_t *entry) { DEQ_REMOVE(entries, entry); free(entry->file); free(entry->module); @@ -81,11 +109,9 @@ typedef struct log_sink_t { FILE *file; DEQ_LINKS(struct log_sink_t); } log_sink_t; - -DEQ_DECLARE(log_sink_t, log_sink_list_t); - -static sys_mutex_t *log_sink_list_lock = 0; -static log_sink_list_t sink_list = {0}; +DEQ_DECLARE(log_sink_t, log_sinks_t); +static sys_mutex_t *log_sinks_lock = 0; +static log_sinks_t sink_list = {0}; const char *format = "%Y-%m-%d %H:%M:%S.%%06lu %z"; bool utc = false; @@ -111,9 +137,11 @@ static const char* SINK_STDERR = "stderr"; static const char* SINK_SYSLOG = "syslog"; static const char* SOURCE_DEFAULT = "DEFAULT"; +// Hold the log_sinks_lock to prevent collision +// with log_sink(). static void log_sink_decref(const log_sink_t *sink) { if (!sink) return; - sys_mutex_lock(log_sink_list_lock); + sys_mutex_lock(log_sinks_lock); assert(sink->ref_count); log_sink_t *mutable_sink = (log_sink_t *)sink; @@ -127,12 +155,14 @@ static void log_sink_decref(const log_sink_t *sink) { closelog(); free(mutable_sink); } - sys_mutex_unlock(log_sink_list_lock); + sys_mutex_unlock(log_sinks_lock); } -static const log_sink_t* log_sink(const char* name) { - sys_mutex_lock(log_sink_list_lock); - log_sink_t* sink = DEQ_HEAD(sink_list); +// Hold the log_sinks_lock to prevent collision +// with log_sink_decref(). +static const log_sink_t *log_sink(const char *name) { + sys_mutex_lock(log_sinks_lock); + log_sink_t *sink = DEQ_HEAD(sink_list); DEQ_FIND(sink, strcmp(sink->name, name) == 0); if (sink) { @@ -156,11 +186,8 @@ static const log_sink_t* log_sink(const char* name) { file = fopen(name, "a"); } - //If file is not there, return 0. - // We are not logging an error here since we are already holding the log_source_lock - // Writing a log message will try to re-obtain the log_source_lock lock and cause a deadlock. if (!file && !syslog) { - sys_mutex_unlock(log_sink_list_lock); + sys_mutex_unlock(log_sinks_lock); return 0; } @@ -173,7 +200,7 @@ static const log_sink_t* log_sink(const char* name) { DEQ_INSERT_TAIL(sink_list, sink); } - sys_mutex_unlock(log_sink_list_lock); + sys_mutex_unlock(log_sinks_lock); return (const log_sink_t *)sink; } @@ -193,16 +220,13 @@ struct qd_log_source_t { bool syslog; const log_sink_t *sink; uint64_t severity_histogram[N_LEVEL_INDICES]; + sys_mutex_t *lock; }; - DEQ_DECLARE(qd_log_source_t, qd_log_source_list_t); - -static sys_mutex_t *log_source_lock = 0; static qd_log_source_list_t source_list = {0}; - typedef struct level_t { - const char* name; + const char *name; int bit; // QD_LOG bit int mask; // Bit or higher const int syslog; @@ -229,7 +253,7 @@ static const level_t invalid_level = {"invalid", -2, -2, 0}; static char level_names[TEXT_MAX] = {0}; /* Set up in qd_log_initialize */ /// Return NULL and set qd_error if not a valid bit. -static const level_t* level_for_bit(int bit) { +static const level_t *level_for_bit(int bit) { level_index_t i = 0; while (i < N_LEVELS && levels[i].bit != bit) ++i; if (i == N_LEVELS) { @@ -239,7 +263,7 @@ static const level_t* level_for_bit(int bit) { } /// Return NULL and set qd_error if not a valid level. -static const level_t* level_for_name(const char *name, int len) { +static const level_t *level_for_name(const char *name, int len) { level_index_t i = 0; while (i < N_LEVELS && strncasecmp(levels[i].name, name, len) != 0) ++i; if (i == N_LEVELS) { @@ -265,7 +289,7 @@ static int level_index_for_bit(int bit) { } /// Return the name of log level or 0 if not found. -static const char* level_name(int level) { +static const char *level_name(int level) { return (0 <= level && level < N_LEVELS) ? levels[level].name : NULL; } @@ -283,18 +307,19 @@ static int enable_mask(const char *enable_) { { int len = strlen(token); int plus = (len > 0 && token[len-1] == '+') ? 1 : 0; - const level_t* level = level_for_name(token, len-plus); + const level_t *level = level_for_name(token, len-plus); mask |= (plus ? level->mask : level->bit); } free(enable); return mask; } -/// Caller must hold log_source_lock -static qd_log_source_t* lookup_log_source_lh(const char *module) +static qd_log_source_t *lookup_log_source(const char *module) { - if (strcasecmp(module, SOURCE_DEFAULT) == 0) + if (strcasecmp(module, SOURCE_DEFAULT) == 0) { return default_log_source; + } + qd_log_source_t *src = DEQ_HEAD(source_list); DEQ_FIND(src, strcasecmp(module, src->module) == 0); return src; @@ -307,10 +332,10 @@ static bool default_bool(int value, int default_value) { static void write_log(qd_log_source_t *log_source, qd_log_entry_t *entry) { // Don't let the sink list change while we are writing to one of them. - sys_mutex_lock(log_sink_list_lock); - const log_sink_t* sink = log_source->sink ? log_source->sink : default_log_source->sink; + sys_mutex_lock(log_source->lock); + const log_sink_t *sink = log_source->sink ? log_source->sink : default_log_source->sink; if (!sink) { - sys_mutex_unlock(log_sink_list_lock); + sys_mutex_unlock(log_source->lock); return; } @@ -353,56 +378,42 @@ static void write_log(qd_log_source_t *log_source, qd_log_entry_t *entry) if (syslog_level != -1) syslog(syslog_level, "%s", log_str); } - sys_mutex_unlock(log_sink_list_lock); + sys_mutex_unlock(log_source->lock); } /// Reset the log source to the default state -static void qd_log_source_defaults(qd_log_source_t *log_source) { - log_source->mask = -1; - log_source->includeTimestamp = -1; - log_source->includeSource = -1; - log_source->sink = 0; - memset ( log_source->severity_histogram, 0, sizeof(uint64_t) * (N_LEVEL_INDICES) ); -} - -/// Caller must hold the log_source_lock -static qd_log_source_t *qd_log_source_lh(const char *module) -{ - qd_log_source_t *log_source = lookup_log_source_lh(module); - if (!log_source) - { - log_source = NEW(qd_log_source_t); - ZERO(log_source); - log_source->module = (char*) malloc(strlen(module) + 1); - strcpy(log_source->module, module); - qd_log_source_defaults(log_source); - DEQ_INSERT_TAIL(source_list, log_source); - qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source); - } - return log_source; +static void qd_log_source_defaults(qd_log_source_t *src) { + src->mask = -1; + src->includeTimestamp = -1; + src->includeSource = -1; + log_sink_decref(src->sink); + src->sink = 0; + memset ( src->severity_histogram, 0, sizeof(uint64_t) * (N_LEVEL_INDICES) ); } qd_log_source_t *qd_log_source(const char *module) { - sys_mutex_lock(log_source_lock); - qd_log_source_t* src = qd_log_source_lh(module); - sys_mutex_unlock(log_source_lock); + qd_log_source_t *src = lookup_log_source(module); return src; } +// This is called by management thread, and alters the +// log sink. Take lock to avoid collision with worker threads. qd_log_source_t *qd_log_source_reset(const char *module) { - sys_mutex_lock(log_source_lock); - qd_log_source_t* src = qd_log_source_lh(module); + qd_log_source_t *src = qd_log_source(module); + sys_mutex_lock(src->lock); qd_log_source_defaults(src); - sys_mutex_unlock(log_source_lock); + sys_mutex_unlock(src->lock); return src; } -static void qd_log_source_free_lh(qd_log_source_t* src) { +// This is called only during finalize, which does not hold locks. +static void qd_log_source_free(qd_log_source_t *src) { DEQ_REMOVE(source_list, src); log_sink_decref(src->sink); free(src->module); + free(src->lock); free(src); } @@ -414,17 +425,18 @@ bool qd_log_enabled(qd_log_source_t *source, qd_log_level_t level) { void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, bool check_level, const char *file, int line, const char *fmt, va_list ap) { - /*----------------------------------------------- - Count this log-event in this log's histogram - whether or not this log is currently enabled. - We can always decide not to look at it later, - based on its used/unused status. - -----------------------------------------------*/ + // Count this log-event in this log's histogram + // whether or not this log is currently enabled. + // We can always decide not to look at it later, + // based on its used/unused status. int level_index = level_index_for_bit(level); if (level_index < 0) qd_error_clear(); - else + else { + sys_mutex_lock(source->lock); source->severity_histogram[level_index]++; + sys_mutex_unlock(source->lock); + } if (check_level) { if (!qd_log_enabled(source, level)) @@ -435,12 +447,7 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, bool check_leve qd_log_entry_t *entry = new_qd_log_entry_t(); DEQ_ITEM_INIT(entry); - // - // Obtain the log_source_lock global lock. We need to do this, if not, the qd_log_entity() function - // could free the log_source->sink from underneath you and replace it with a new sink. - // Once we obtain this lock, we only release the lock once the log line is written to the sink. - // - sys_mutex_lock(log_source_lock); + sys_mutex_lock(entries_lock); entry->module = source->module ? strdup(source->module) : 0; entry->level = level; entry->file = file ? strdup(file) : 0; @@ -451,7 +458,7 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, bool check_leve DEQ_INSERT_TAIL(entries, entry); if (DEQ_SIZE(entries) > LIST_MAX) qd_log_entry_free_lh(DEQ_HEAD(entries)); - sys_mutex_unlock(log_source_lock); + sys_mutex_unlock(entries_lock); } void qd_log_impl_v1(qd_log_source_t *source, qd_log_level_t level, const char *file, int line, const char *fmt, ...) @@ -486,7 +493,7 @@ PyObject *qd_log_recent_py(long limit) { int i = 0; // NOTE: PyList_SetItem steals a reference so no leak here. PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->module)); - const char* level = level_name( level_index_for_bit(entry->level) + 2 ); + const char *level = level_name( level_index_for_bit(entry->level) + 2 ); PyList_SetItem(py_entry, i++, level ? PyUnicode_FromString(level) : inc_none()); PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->text)); PyList_SetItem(py_entry, i++, entry->file ? PyUnicode_FromString(entry->file) : inc_none()); @@ -506,13 +513,39 @@ PyObject *qd_log_recent_py(long limit) { return NULL; } +static void _add_log_source (const char *module_name) { + qd_log_source_t *log_source; + log_source = NEW(qd_log_source_t); + ZERO(log_source); + log_source->module = qd_strdup(module_name); + qd_log_source_defaults(log_source); + log_source->lock = sys_mutex(); + DEQ_INSERT_TAIL(source_list, log_source); + qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source); + + if (!strcmp(SOURCE_DEFAULT, module_name)) { + default_log_source = log_source; + } +} + void qd_log_initialize(void) { DEQ_INIT(entries); DEQ_INIT(source_list); DEQ_INIT(sink_list); - log_sink_list_lock = sys_mutex(); + int name_offset = strlen("QD_SCHEMA_LOG_MODULE_"); + + int i ; + for (i = 0; i < QD_SCHEMA_LOG_MODULE_ENUM_COUNT; ++ i) + { + const char *module_name = qd_schema_log_module_names[i] + name_offset; + _add_log_source(module_name); + } + _add_log_source("MAIN"); + _add_log_source("DISPLAYNAME"); + + log_sinks_lock = sys_mutex(); // Set up level_names for use in error messages. char *begin = level_names, *end = level_names+sizeof(level_names); @@ -520,9 +553,8 @@ void qd_log_initialize(void) for (level_index_t i = NONE + 1; i < N_LEVELS; ++i) aprintf(&begin, end, ", %s", levels[i].name); - log_source_lock = sys_mutex(); + entries_lock = sys_mutex(); - default_log_source = qd_log_source(SOURCE_DEFAULT); default_log_source->mask = levels[INFO].mask; default_log_source->includeTimestamp = true; default_log_source->includeSource = 0; @@ -532,7 +564,7 @@ void qd_log_initialize(void) void qd_log_finalize(void) { while (DEQ_HEAD(source_list)) - qd_log_source_free_lh(DEQ_HEAD(source_list)); + qd_log_source_free(DEQ_HEAD(source_list)); while (DEQ_HEAD(entries)) qd_log_entry_free_lh(DEQ_HEAD(entries)); while (DEQ_HEAD(sink_list)) @@ -540,11 +572,20 @@ void qd_log_finalize(void) { default_log_source = NULL; // stale value would misconfigure new router started again in the same process } +// This is the entry point for management commands that +// may arrive at any time and change the sink in a log +// source. +// If we happen to be writing to the soon-to-be-former +// log sink when it is deleted, a paradox will be generated +// that could destroy the entire space-time continuum in +// which this code is being executed. +// Thus the locks in each log source. +// qd_error_t qd_log_entity(qd_entity_t *entity) { qd_error_clear(); - char* module = 0; + char *module = 0; char *outputFile = 0; char *enable = 0; int include_timestamp = 0; @@ -572,12 +613,6 @@ qd_error_t qd_log_entity(qd_entity_t *entity) // QD_ERROR_BREAK(); - // - // Obtain all attributes from the entity before obtaining the log_source_lock. - // We do this because functions like qd_entity_get_string and qd_entity_get_bool ultimately call qd_vlog_impl() which - // also holds the log_source_lock when calling write_log(). - // - if (qd_entity_has(entity, "outputFile")) { has_output_file = true; outputFile = qd_entity_get_string(entity, "outputFile"); @@ -602,33 +637,28 @@ qd_error_t qd_log_entity(qd_entity_t *entity) QD_ERROR_BREAK(); } - // - // Obtain the log_source_lock lock. This lock is also used when write_log() is called. - // - sys_mutex_lock(log_source_lock); + qd_log_source_t *log_source = qd_log_source(module); /* The original(already existing) log source */ - qd_log_source_t *src = qd_log_source_lh(module); /* The original(already existing) log source */ + sys_mutex_lock(log_source->lock); if (has_output_file) { - const log_sink_t* sink = log_sink(outputFile); + const log_sink_t *sink = log_sink(outputFile); if (!sink) { error_in_output = true; - sys_mutex_unlock(log_source_lock); + sys_mutex_unlock(log_source->lock); break; } // DEFAULT source may already have a sink, so free the old sink first - if (src->sink) { - log_sink_decref(src->sink); - } + log_sink_decref(log_source->sink); // Assign the new sink - src->sink = sink; + log_source->sink = sink; - if (src->sink->syslog) { + if (log_source->sink->syslog) { // Timestamp should be off for syslog. is_sink_syslog = true; - src->includeTimestamp = 0; + log_source->includeTimestamp = 0; } } @@ -637,28 +667,28 @@ qd_error_t qd_log_entity(qd_entity_t *entity) if (mask < -1) { error_in_enable = true; - sys_mutex_unlock(log_source_lock); + sys_mutex_unlock(log_source->lock); break; } else { - src->mask = mask; + log_source->mask = mask; } - if (qd_log_enabled(src, QD_LOG_TRACE)) { + if (qd_log_enabled(log_source, QD_LOG_TRACE)) { trace_enabled = true; } } if (has_include_timestamp && !is_sink_syslog) { // Timestamp should be off for syslog. - src->includeTimestamp = include_timestamp; + log_source->includeTimestamp = include_timestamp; } if (has_include_source) { - src->includeSource = include_source; + log_source->includeSource = include_source; } - sys_mutex_unlock(log_source_lock); + sys_mutex_unlock(log_source->lock); } while(0); @@ -679,8 +709,9 @@ qd_error_t qd_log_entity(qd_entity_t *entity) free(enable); // - // If trace logging is enabled, loop thru all connections in the router and call the pn_transport_set_tracer callback - // so proton frame trace can be output as part of the router trace log. + // If trace logging is enabled, loop thru all connections in the router and + // call the pn_transport_set_tracer callback so proton frame trace can be output + // as part of the router trace log. // if (trace_enabled) { qd_server_trace_all_connections(); @@ -689,7 +720,7 @@ qd_error_t qd_log_entity(qd_entity_t *entity) return qd_error_code(); } -void qd_format_string(char* buf, int buf_size, const char *fmt, ...) +void qd_format_string(char *buf, int buf_size, const char *fmt, ...) { va_list args; va_start(args, fmt); @@ -698,7 +729,7 @@ void qd_format_string(char* buf, int buf_size, const char *fmt, ...) } -qd_error_t qd_entity_refresh_logStats(qd_entity_t* entity, void *impl) +qd_error_t qd_entity_refresh_logStats(qd_entity_t *entity, void *impl) { qd_log_source_t *log = (qd_log_source_t*)impl; char identity_str[TEXT_MAX]; diff --git a/tests/tsan.supp b/tests/tsan.supp index 7ab3ca0..417c619 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -50,7 +50,7 @@ race:qdr_record_link_credit race:qdr_process_tick_CT # DISPATCH-2133 (harmless) -race:qd_log_enabled +#race:qd_log_enabled # DISPATCH-2134 race:qdr_link_process_initial_delivery_CT --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org