----- Original Message ----- > Updated Branches: > refs/heads/master e0ec5304d -> 8a1112881 > > > TS-2089: introduce configurable collation preproc threads > > We found that CPU of logging thread could be easy to reach up 100% in > collation host, but disk IO was low at the same time. > > The bottleneck of logging thread is that some preprocessing job, such as > convert LogBuffer to ascii text, consume so much CPU time. And more > worse, the write() operation will block logging thread. > > So this patch try to split logging thread into two parts: > 1) Configurable preproc threads, which are responsiable for processing all > of prepare work, and then forward the preprocessed buffer to flush thread, > or send them to CollationClient/HostSM. > > 2) One Flush thread, it will consume preprocessed buffers and write them to > disk. In our testing, one flush thread is enough for us. > > TODO: This patch supports only one flush thread, we can improve it to > "one flush thread per file/disk" in the future. > > == How to configure == > The number of preproc threads is 1 by default. > > Please modify "proxy.config.log.collation_preproc_threads" option to > change it.
First off: I *love* your commit messages. > Signed-off-by: Yunkai Zhang <[email protected]> > > > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo > Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288 > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288 > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288 > > Branch: refs/heads/master > Commit: 8a1112881813b5a09e0de8f51770f99337febcfc > Parents: e0ec530 > Author: Yunkai Zhang <[email protected]> > Authored: Sun Aug 11 16:42:32 2013 +0800 > Committer: Yunkai Zhang <[email protected]> > Committed: Fri Aug 16 10:41:16 2013 +0800 > > ---------------------------------------------------------------------- > CHANGES | 3 + > mgmt/RecordsConfig.cc | 2 + > mgmt/cli/ShowCmd.cc | 3 + > proxy/logging/Log.cc | 257 ++++++++++++++++++++++++----- > proxy/logging/Log.h | 46 +++++- > proxy/logging/LogBufferSink.h | 9 +- > proxy/logging/LogCollationClientSM.cc | 6 +- > proxy/logging/LogCollationHostSM.cc | 4 +- > proxy/logging/LogConfig.cc | 19 ++- > proxy/logging/LogConfig.h | 1 + > proxy/logging/LogFile.cc | 121 ++++++-------- > proxy/logging/LogFile.h | 20 +-- > proxy/logging/LogHost.cc | 66 ++++---- > proxy/logging/LogHost.h | 14 +- > proxy/logging/LogObject.cc | 68 ++++---- > proxy/logging/LogObject.h | 43 +++-- > 16 files changed, 450 insertions(+), 232 deletions(-) > ---------------------------------------------------------------------- I'm missing a change to doc/reference/configuration/records.config.en.rst We should make it a habit of adding documentation in the same commit as new records.config changes. > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES > ---------------------------------------------------------------------- > diff --git a/CHANGES b/CHANGES > index 6b9a3bb..c613f00 100644 > --- a/CHANGES > +++ b/CHANGES > @@ -1,6 +1,9 @@ > -*- coding: utf-8 > -*- > Changes with Apache Traffic Server 3.5.0 > > + > + *) TS-2089: introduce configurable collation preproc threads > + > *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned > needlessly chowned to to ATS' user. > Author: Tomasz Kuzemko <[email protected]> > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc > ---------------------------------------------------------------------- > diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc > index cfc2267..9402581 100644 > --- a/mgmt/RecordsConfig.cc > +++ b/mgmt/RecordsConfig.cc > @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = { > , > {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT, > "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL} > , > + {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1", > RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL} > + , > {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1", > RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL} > , > {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400", > RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc > ---------------------------------------------------------------------- > diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc > index 0798c43..ed71560 100644 > --- a/mgmt/cli/ShowCmd.cc > +++ b/mgmt/cli/ShowCmd.cc > @@ -1555,6 +1555,7 @@ ShowLogging() > TSInt collation_port = -1; > TSString collation_secret = NULL; > TSInt host_tag = 0; > + TSInt preproc_threads = 0; > TSInt orphan_space = -1; > > TSInt squid_log = 0; > @@ -1596,6 +1597,7 @@ ShowLogging() > Cli_RecordGetString("proxy.config.log.collation_secret", > &collation_secret); > Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag); > Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs", > &orphan_space); > + Cli_RecordGetInt("proxy.config.log.collation_preproc_threads", > &preproc_threads); > > Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log); > Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii); > @@ -1657,6 +1659,7 @@ ShowLogging() > Cli_Printf(" Port ----------------------------------- %d\n", > collation_port); > Cli_Printf(" Secret --------------------------------- %s\n", > collation_secret); > Cli_PrintEnable(" Host Tagged ---------------------------- ", host_tag); > + Cli_PrintEnable(" Preproc Threads ------------------------ ", > preproc_threads); > Cli_Printf(" Space Limit for Orphan Files ----------- %d MB\n", > orphan_space); > > Cli_PrintEnable("\nSquid Format ----------------------------- ", > squid_log); > > http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc > ---------------------------------------------------------------------- > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc > index 94d5625..1361978 100644 > --- a/proxy/logging/Log.cc > +++ b/proxy/logging/Log.cc > @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects; > size_t Log::maxInactiveObjects; > > // Flush thread stuff > -volatile unsigned long Log::flush_counter = 0; This bit is fascinating: We're replacing the (seemingly) unused volatile variable flush_counter with the already existing variable m_bytes_written.. We turn that variable into a volatile, and continue to use it as if nothing happened. But here's the question: why was flush_counter not used? > -ink_mutex Log::flush_mutex; > -ink_cond Log::flush_cond; > -ink_thread Log::flush_thread; > +ink_mutex *Log::preproc_mutex; > +ink_cond *Log::preproc_cond; > +ink_mutex *Log::flush_mutex; > +ink_cond *Log::flush_cond; > +InkAtomicList *Log::flush_data_list; > > // Collate thread stuff > ink_mutex Log::collate_mutex; > ink_cond Log::collate_cond; > ink_thread Log::collate_thread; > int Log::collation_accept_file_descriptor; > +int Log::collation_preproc_threads; > int Log::collation_port; > > // Log private objects > @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object) > > struct PeriodicWakeup; > typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *); > -struct PeriodicWakeup : Continuation { > +struct PeriodicWakeup : Continuation > +{ > + int m_preproc_threads; > + int m_flush_threads; > + > int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) > { > - ink_cond_signal (&Log::flush_cond); > - return EVENT_CONT; > + for (int i = 0; i < m_preproc_threads; i++) { > + ink_cond_signal (&Log::preproc_cond[i]); > + } > + for (int i = 0; i < m_flush_threads; i++) { > + ink_cond_signal (&Log::flush_cond[i]); > + } > + return EVENT_CONT; > } > > - PeriodicWakeup () : Continuation (new_ProxyMutex()) > + PeriodicWakeup (int preproc_threads, int flush_threads) : > + Continuation (new_ProxyMutex()), > + m_preproc_threads(preproc_threads), > + m_flush_threads(flush_threads) > { > - SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup); > + SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup); > } > }; > > @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now) > /*------------------------------------------------------------------------- > MAIN INTERFACE > -------------------------------------------------------------------------*/ > +struct LoggingPreprocContinuation: public Continuation > +{ > + int m_idx; > + > + int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) > + { > + Log::preproc_thread_main((void *)&m_idx); > + return 0; > + } > + > + LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx) > + { > + SET_HANDLER(&LoggingPreprocContinuation::mainEvent); > + } > +}; > + > struct LoggingFlushContinuation: public Continuation > { > + int m_idx; > + > int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) > { > - Log::flush_thread_main(NULL); > + Log::flush_thread_main((void *)&m_idx); > return 0; > } > > - LoggingFlushContinuation():Continuation(NULL) > + LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx) > { > SET_HANDLER(&LoggingFlushContinuation::mainEvent); > } > @@ -910,6 +942,7 @@ Log::init(int flags) > numInactiveObjects = 0; > inactive_objects = new LogObject*[maxInactiveObjects]; > > + collation_preproc_threads = 1; > collation_accept_file_descriptor = NO_FD; > > // store the configuration flags > @@ -931,6 +964,7 @@ Log::init(int flags) > > config->read_configuration_variables(); > collation_port = config->collation_port; > + collation_preproc_threads = config->collation_preproc_threads; > > if (config_flags & STANDALONE_COLLATOR) { > logging_mode = LOG_TRANSACTIONS_ONLY; > @@ -959,8 +993,8 @@ Log::init(int flags) > create_threads(); > > #ifndef INK_SINGLE_THREADED > - eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND, > - ET_CALL); > + eventProcessor.schedule_every(NEW (new > PeriodicWakeup(collation_preproc_threads, 1)), > + HRTIME_SECOND, ET_CALL); > #endif > init_status |= PERIODIC_WAKEUP_SCHEDULED; > > @@ -1001,9 +1035,16 @@ Log::init_when_enabled() > // setup global scrap object > // > global_scrap_format = NEW(new LogFormat(TEXT_LOG)); > - global_scrap_object = NEW(new LogObject(global_scrap_format, > Log::config->logfile_dir, "scrapfile.log", BINARY_LOG, > - NULL, > Log::config->rolling_enabled, Log::config->rolling_interval_sec, > - Log::config->rolling_offset_hr, > Log::config->rolling_size_mb)); > + global_scrap_object = > + NEW(new LogObject(global_scrap_format, > + Log::config->logfile_dir, > + "scrapfile.log", > + BINARY_LOG, NULL, > + Log::config->rolling_enabled, > + Log::config->collation_preproc_threads, > + Log::config->rolling_interval_sec, > + Log::config->rolling_offset_hr, > + Log::config->rolling_size_mb)); > > // create the flush thread and the collation thread > // > @@ -1030,15 +1071,43 @@ Log::create_threads() > > REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize"); > if (!(init_status & THREADS_CREATED)) { > - // start the flush thread > + > + char desc[64]; > + preproc_mutex = new ink_mutex[collation_preproc_threads]; > + preproc_cond = new ink_cond[collation_preproc_threads]; > + > + size_t stacksize; > + REC_ReadConfigInteger(stacksize, > "proxy.config.thread.default.stacksize"); This ^ seems like an unnecessary step, that could be pushed into ... > + > + // start the preproc threads > // > // no need for the conditional var since it will be relying on > // on the event system. > - ink_mutex_init(&flush_mutex, "Flush thread mutex"); > - ink_cond_init(&flush_cond); > - Continuation *flush_continuation = NEW(new LoggingFlushContinuation); > - Event *flush_event = eventProcessor.spawn_thread(flush_continuation, > "[LOGGING]", stacksize); > - flush_thread = flush_event->ethread->tid; > + for (int i = 0; i < collation_preproc_threads; i++) { > + sprintf(desc, "Logging preproc thread mutex[%d]", i); > + ink_mutex_init(&preproc_mutex[i], desc); > + ink_cond_init(&preproc_cond[i]); > + Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i)); > + sprintf(desc, "[LOG_PREPROC %d]", i); > + eventProcessor.spawn_thread(preproc_cont, desc, stacksize); spawn_thread() as a default.. i.e.: eventProcessor::spawn_thread(Continuation, const char * desc, int stacksize -1) { size_t thread_stacksize; if stacksize -1 { REC_ReadConfigInteger(thread_stacksize, "proxy.config.thread.default.stacksize") } else { thread_stacksize = stacksize } } > /*------------------------------------------------------------------------- > +<<<<<<< HEAD > LogFile::write_and_try_delete > +======= > + LogFile::preproc_and_try_delete > + > + preprocess the given buffer data before write to target file > + and try to delete it when its reference become zero. > +>>>>>>> TS-2089: introduce configurable collation preproc threads To reiterate a point Leif made recently about one of *my* commits: We should make sure that every single commit actually compiles.. Anyway, that's all I have for now. -- i Igor Galić Tel: +43 (0) 664 886 22 883 Mail: [email protected] URL: http://brainsware.org/ GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE
