This is an automated email from the ASF dual-hosted git repository. amc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new 3ab9742 event loop metrics collection 3ab9742 is described below commit 3ab9742521c6d7112ed6d159de72ef2eb8a02ebf Author: Fei Deng <f...@oath.com> AuthorDate: Thu Oct 19 10:14:32 2017 -0500 event loop metrics collection --- cmd/traffic_manager/Makefile.am | 2 +- .../monitoring/statistics/core/misc.en.rst | 34 +++++++ iocore/eventsystem/I_EThread.h | 78 ++++++++++++++- iocore/eventsystem/UnixEThread.cc | 105 ++++++++++++++++++++- iocore/eventsystem/UnixEventProcessor.cc | 60 ++++++++++++ 5 files changed, 274 insertions(+), 5 deletions(-) diff --git a/cmd/traffic_manager/Makefile.am b/cmd/traffic_manager/Makefile.am index cf64df9..3f02a99 100644 --- a/cmd/traffic_manager/Makefile.am +++ b/cmd/traffic_manager/Makefile.am @@ -51,9 +51,9 @@ traffic_manager_LDADD = \ $(top_builddir)/mgmt/api/libmgmtapilocal.la \ $(top_builddir)/mgmt/libmgmt_lm.la \ $(top_builddir)/proxy/hdrs/libhdrs.a \ - $(top_builddir)/lib/records/librecords_lm.a \ $(top_builddir)/lib/ts/libtsutil.la \ $(top_builddir)/iocore/eventsystem/libinkevent.a \ + $(top_builddir)/lib/records/librecords_lm.a \ $(top_builddir)/proxy/shared/libdiagsconfig.a AM_LDFLAGS += \ diff --git a/doc/admin-guide/monitoring/statistics/core/misc.en.rst b/doc/admin-guide/monitoring/statistics/core/misc.en.rst index 81cd3db..ba03882 100644 --- a/doc/admin-guide/monitoring/statistics/core/misc.en.rst +++ b/doc/admin-guide/monitoring/statistics/core/misc.en.rst @@ -25,3 +25,37 @@ Miscellaneous .. ts:stat:: global proxy.process.http.misc_count_stat integer .. ts:stat:: global proxy.process.http.misc_user_agent_bytes_stat integer +.. ts:stat:: global proxy.process.eventloop count integer + :unit: nanoseconds + + Number of event loops executed. + +.. ts:stat:: global proxy.process.eventloop events integer + :unit: nanoseconds + + Number of events executed. + +.. ts:stat:: global proxy.process.eventloop events.min integer + :unit: nanoseconds + + Minimum number of events dispatched in a loop. + +.. ts:stat:: global proxy.process.eventloop events.max integer + :unit: nanoseconds + + Maximum number of events dispatched in a loop. + +.. ts:stat:: global proxy.process.eventloop wait integer + :unit: nanoseconds + + Number of loops that did a conditional wait. + +.. ts:stat:: global proxy.process.eventloop time.min integer + :unit: nanoseconds + + Shortest time spent in loop. + +.. ts:stat:: global proxy.process.eventloop time.max integer + :unit: nanoseconds + + Longest time spent in loop. diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h index 1739df2..e15d7b3 100644 --- a/iocore/eventsystem/I_EThread.h +++ b/iocore/eventsystem/I_EThread.h @@ -330,7 +330,7 @@ public: void execute() override; void execute_regular(); - void process_queue(Que(Event, link) * NegativeQueue); + void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count); void process_event(Event *e, int calling_code); void free_event(Event *e); LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER; @@ -379,6 +379,82 @@ public: friend class EThread; } DEFAULT_TAIL_HANDLER = EventQueueExternal; + + /// Statistics data for event dispatching. + struct EventMetrics { + /// Time the loop was active, not including wait time but including event dispatch time. + struct LoopTimes { + ink_hrtime _start; ///< The time of the first loop for this sample. Used to mark valid entries. + ink_hrtime _min; ///< Shortest loop time. + ink_hrtime _max; ///< Longest loop time. + LoopTimes() : _start(0), _min(INT64_MAX), _max(0) {} + } _loop_time; + + struct Events { + int _min; + int _max; + int _total; + Events() : _min(INT_MAX), _max(0), _total(0) {} + } _events; + + int _count; ///< # of times the loop executed. + int _wait; ///< # of timed wait for events + + /// Add @a that to @a this data. + /// This embodies the custom logic per member concerning whether each is a sum, min, or max. + EventMetrics &operator+=(EventMetrics const &that); + + EventMetrics() : _count(0), _wait(0) {} + }; + + /** The number of metric blocks kept. + This is a circular buffer, with one block per second. We have a bit more than the required 1000 + to provide sufficient slop for cross thread reading of the data (as only the current metric block + is being updated). + */ + static int const N_EVENT_METRICS = 1024; + + volatile EventMetrics *current_metric; ///< The current element of @a metrics + EventMetrics metrics[N_EVENT_METRICS]; + + /** The various stats provided to the administrator. + THE ORDER IS VERY SENSITIVE. + More than one part of the code depends on this exact order. Be careful and thorough when changing. + */ + enum STAT_ID { + STAT_LOOP_COUNT, ///< # of event loops executed. + STAT_LOOP_EVENTS, ///< # of events + STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop + STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop + STAT_LOOP_WAIT, ///< # of loops that did a conditional wait. + STAT_LOOP_TIME_MIN, ///< Shortest time spent in loop. + STAT_LOOP_TIME_MAX, ///< Longest time spent in loop. + N_EVENT_STATS ///< NOT A VALID STAT INDEX - # of different stat types. + }; + + static char const *const STAT_NAME[N_EVENT_STATS]; + + /** The number of time scales used in the event statistics. + Currently these are 10s, 100s, 1000s. + */ + static int const N_EVENT_TIMESCALES = 3; + /// # of samples for each time scale. + static int const SAMPLE_COUNT[N_EVENT_TIMESCALES]; + + /// Process the last 1000s of data and write out the summaries to @a summary. + void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]); + /// Back up the metric pointer, wrapping as needed. + EventMetrics * + prev(EventMetrics volatile *current) + { + return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile + } + /// Advance the metric pointer, wrapping as needed. + EventMetrics * + next(EventMetrics volatile *current) + { + return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile + } }; /** diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc index 82d074f..7fbede5 100644 --- a/iocore/eventsystem/UnixEThread.cc +++ b/iocore/eventsystem/UnixEThread.cc @@ -38,6 +38,14 @@ struct AIOCallback; #define NO_HEARTBEAT -1 #define THREAD_MAX_HEARTBEAT_MSECONDS 60 +// !! THIS MUST BE IN THE ENUM ORDER !! +char const *const EThread::STAT_NAME[] = {"proxy.process.eventloop.count", "proxy.process.eventloop.events", + "proxy.process.eventloop.events.min", "proxy.process.eventloop.events.max", + "proxy.process.eventloop.wait", "proxy.process.eventloop.time.min", + "proxy.process.eventloop.time.max"}; + +int const EThread::SAMPLE_COUNT[N_EVENT_TIMESCALES] = {10, 100, 1000}; + bool shutdown_event_system = false; EThread::EThread() @@ -142,7 +150,8 @@ EThread::process_event(Event *e, int calling_code) } } -void EThread::process_queue(Que(Event, link) * NegativeQueue) +void +EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count) { Event *e; @@ -152,6 +161,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue) // execute all the available external events that have // already been dequeued while ((e = EventQueueExternal.dequeue_local())) { + ++(*ev_count); if (e->cancelled) { free_event(e); } else if (!e->timeout_at) { // IMMEDIATE @@ -172,6 +182,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue) NegativeQueue->insert(e, p); } } + ++(*nq_count); } } @@ -181,6 +192,18 @@ EThread::execute_regular() Event *e; Que(Event, link) NegativeQueue; ink_hrtime next_time = 0; + ink_hrtime delta = 0; // time spent in the event loop + ink_hrtime loop_start_time; // Time the loop started. + ink_hrtime loop_finish_time; // Time at the end of the loop. + + // Track this so we can update on boundary crossing. + EventMetrics *prev_metric = this->prev(metrics + (ink_get_hrtime_internal() / HRTIME_SECOND) % N_EVENT_METRICS); + + int nq_count = 0; + int ev_count = 0; + + // A statically initialized instance we can use as a prototype for initializing other instances. + static EventMetrics METRIC_INIT; // give priority to immediate events for (;;) { @@ -188,7 +211,22 @@ EThread::execute_regular() return; } - process_queue(&NegativeQueue); + loop_start_time = Thread::get_hrtime_updated(); + nq_count = 0; // count # of elements put on negative queue. + ev_count = 0; // # of events handled. + + current_metric = metrics + (loop_start_time / HRTIME_SECOND) % N_EVENT_METRICS; + if (current_metric != prev_metric) { + // Mixed feelings - really this shouldn't be needed, but just in case more than one entry is + // skipped, clear them all. + do { + memcpy((prev_metric = this->next(prev_metric)), &METRIC_INIT, sizeof(METRIC_INIT)); + } while (current_metric != prev_metric); + current_metric->_loop_time._start = loop_start_time; + } + ++(current_metric->_count); + + process_queue(&NegativeQueue, &ev_count, &nq_count); bool done_one; do { @@ -209,7 +247,7 @@ EThread::execute_regular() // execute any negative (poll) events if (NegativeQueue.head) { - process_queue(&NegativeQueue); + process_queue(&NegativeQueue, &ev_count, &nq_count); // execute poll events while ((e = NegativeQueue.dequeue())) { @@ -221,6 +259,7 @@ EThread::execute_regular() ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated(); if (sleep_time > 0) { sleep_time = std::min(sleep_time, HRTIME_MSECONDS(THREAD_MAX_HEARTBEAT_MSECONDS)); + ++(current_metric->_wait); } else { sleep_time = 0; } @@ -230,6 +269,29 @@ EThread::execute_regular() } tail_cb->waitForActivity(sleep_time); + + // loop cleanup + loop_finish_time = this->get_hrtime_updated(); + delta = loop_finish_time - loop_start_time; + + // This can happen due to time of day adjustments (which apparently happen quite frequently). I + // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds + // of milliseconds), far too much to be actually used. + if (delta > 0) { + if (delta > current_metric->_loop_time._max) { + current_metric->_loop_time._max = delta; + } + if (delta < current_metric->_loop_time._min) { + current_metric->_loop_time._min = delta; + } + } + if (ev_count < current_metric->_events._min) { + current_metric->_events._min = ev_count; + } + if (ev_count > current_metric->_events._max) { + current_metric->_events._max = ev_count; + } + current_metric->_events._total += ev_count; } } @@ -271,3 +333,40 @@ EThread::execute() } /* End switch */ // coverity[missing_unlock] } + +EThread::EventMetrics & +EThread::EventMetrics::operator+=(EventMetrics const &that) +{ + this->_events._max = std::max(this->_events._max, that._events._max); + this->_events._min = std::min(this->_events._min, that._events._min); + this->_events._total += that._events._total; + this->_loop_time._min = std::min(this->_loop_time._min, that._loop_time._min); + this->_loop_time._max = std::max(this->_loop_time._max, that._loop_time._max); + this->_count += that._count; + this->_wait += that._wait; + return *this; +} + +void +EThread::summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]) +{ + // Accumulate in local first so each sample only needs to be processed once, + // not N_EVENT_TIMESCALES times. + EventMetrics sum; + + // To avoid race conditions, we back up one from the current metric block. It's close enough + // and won't be updated during the time this method runs so it should be thread safe. + EventMetrics *m = this->prev(current_metric); + + for (int t = 0; t < N_EVENT_TIMESCALES; ++t) { + int count = SAMPLE_COUNT[t]; + if (t > 0) + count -= SAMPLE_COUNT[t - 1]; + while (--count >= 0) { + if (0 != m->_loop_time._start) + sum += *m; + m = this->prev(m); + } + summary[t] += sum; // push out to return vector. + } +} diff --git a/iocore/eventsystem/UnixEventProcessor.cc b/iocore/eventsystem/UnixEventProcessor.cc index 7fc31e7..db7d3f4 100644 --- a/iocore/eventsystem/UnixEventProcessor.cc +++ b/iocore/eventsystem/UnixEventProcessor.cc @@ -72,6 +72,52 @@ ThreadAffinityInitializer Thread_Affinity_Initializer; namespace { +int +EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int) +{ + int id = 0; + EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES]; + + // scan the thread local values + for (int i = 0; i < eventProcessor.n_ethreads; ++i) { + eventProcessor.all_ethreads[i]->summarize_stats(summary); + } + + ink_mutex_acquire(&(rsb->mutex)); + + for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) { + EThread::EventMetrics *m = summary + ts_idx; + // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful. + rsb->global[id + EThread::STAT_LOOP_COUNT]->sum = m->_count; + rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT); + + rsb->global[id + EThread::STAT_LOOP_WAIT]->sum = m->_wait; + rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT); + + rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum = m->_loop_time._min; + rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN); + rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum = m->_loop_time._max; + rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX); + + rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum = m->_events._total; + rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS); + rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum = m->_events._min; + rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN); + rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum = m->_events._max; + rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1; + RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX); + } + + ink_mutex_release(&(rsb->mutex)); + return REC_ERR_OKAY; +} + /// This is a wrapper used to convert a static function into a continuation. The function pointer is /// passed in the cookie. For this reason the class is used as a singleton. /// @internal This is the implementation for @c schedule_spawn... overloads. @@ -382,6 +428,20 @@ EventProcessor::start(int n_event_threads, size_t stacksize) // infrastructure being in place (e.g. the proxy allocators). thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr)); + // Get our statistics set up + RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES); + char name[256]; + + for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) { + for (int id = 0; id < EThread::N_EVENT_STATS; ++id) { + snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]); + RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL); + } + } + + // Name must be that of a stat, pick one at random since we do all of them in one pass/callback. + RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0); + this->spawn_event_threads(ET_CALL, n_event_threads, stacksize); Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads); -- To stop receiving notification emails like this one, please contact ['"commits@trafficserver.apache.org" <commits@trafficserver.apache.org>'].