----- Original Message -----
> Updated Branches:
>   refs/heads/master e0ec5304d -> 8a1112881
> 
> 
> TS-2089: introduce configurable collation preproc threads
> 
> We found that CPU of logging thread could be easy to reach up 100% in
> collation host, but disk IO was low at the same time.
> 
> The bottleneck of logging thread is that some preprocessing job, such as
> convert LogBuffer to ascii text, consume so much CPU time. And more
> worse, the write() operation will block logging thread.
> 
> So this patch try to split logging thread into two parts:
> 1) Configurable preproc threads, which are responsiable for processing all
>    of prepare work, and then forward the preprocessed buffer to flush thread,
>    or send them to CollationClient/HostSM.
> 
> 2) One Flush thread, it will consume preprocessed buffers and write them to
>    disk. In our testing, one flush thread is enough for us.
> 
> TODO: This patch supports only one flush thread, we can improve it to
>       "one flush thread per file/disk" in the future.
> 
> == How to configure ==
> The number of preproc threads is 1 by default.
> 
> Please modify "proxy.config.log.collation_preproc_threads" option to
> change it.


First off: I *love* your commit messages.

> Signed-off-by: Yunkai Zhang <[email protected]>
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/8a111288
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/8a111288
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/8a111288
> 
> Branch: refs/heads/master
> Commit: 8a1112881813b5a09e0de8f51770f99337febcfc
> Parents: e0ec530
> Author: Yunkai Zhang <[email protected]>
> Authored: Sun Aug 11 16:42:32 2013 +0800
> Committer: Yunkai Zhang <[email protected]>
> Committed: Fri Aug 16 10:41:16 2013 +0800
> 
> ----------------------------------------------------------------------
>  CHANGES                               |   3 +
>  mgmt/RecordsConfig.cc                 |   2 +
>  mgmt/cli/ShowCmd.cc                   |   3 +
>  proxy/logging/Log.cc                  | 257 ++++++++++++++++++++++++-----
>  proxy/logging/Log.h                   |  46 +++++-
>  proxy/logging/LogBufferSink.h         |   9 +-
>  proxy/logging/LogCollationClientSM.cc |   6 +-
>  proxy/logging/LogCollationHostSM.cc   |   4 +-
>  proxy/logging/LogConfig.cc            |  19 ++-
>  proxy/logging/LogConfig.h             |   1 +
>  proxy/logging/LogFile.cc              | 121 ++++++--------
>  proxy/logging/LogFile.h               |  20 +--
>  proxy/logging/LogHost.cc              |  66 ++++----
>  proxy/logging/LogHost.h               |  14 +-
>  proxy/logging/LogObject.cc            |  68 ++++----
>  proxy/logging/LogObject.h             |  43 +++--
>  16 files changed, 450 insertions(+), 232 deletions(-)
> ----------------------------------------------------------------------

I'm missing a change to doc/reference/configuration/records.config.en.rst
We should make it a habit of adding documentation in the same commit as
new records.config changes.

> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/CHANGES
> ----------------------------------------------------------------------
> diff --git a/CHANGES b/CHANGES
> index 6b9a3bb..c613f00 100644
> --- a/CHANGES
> +++ b/CHANGES
> @@ -1,6 +1,9 @@
>                                                           -*- coding: utf-8
>                                                           -*-
>  Changes with Apache Traffic Server 3.5.0
>  
> +
> +  *) TS-2089: introduce configurable collation preproc threads
> +
>    *) [TS-2132, TS-2131] ${libexecdir} and $(localstatedir} chowned
>     needlessly chowned to to ATS' user.
>     Author: Tomasz Kuzemko <[email protected]>
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/RecordsConfig.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
> index cfc2267..9402581 100644
> --- a/mgmt/RecordsConfig.cc
> +++ b/mgmt/RecordsConfig.cc
> @@ -1147,6 +1147,8 @@ RecordElement RecordsConfig[] = {
>    ,
>    {RECT_CONFIG, "proxy.config.log.collation_max_send_buffers", RECD_INT,
>    "16", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
>    ,
> +  {RECT_CONFIG, "proxy.config.log.collation_preproc_threads", RECD_INT, "1",
> RECU_DYNAMIC, RR_REQUIRED, RECC_INT, "[1-128]", RECA_NULL}
> +  ,
>    {RECT_CONFIG, "proxy.config.log.rolling_enabled", RECD_INT, "1",
>    RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-4]", RECA_NULL}
>    ,
>    {RECT_CONFIG, "proxy.config.log.rolling_interval_sec", RECD_INT, "86400",
>    RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/mgmt/cli/ShowCmd.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/cli/ShowCmd.cc b/mgmt/cli/ShowCmd.cc
> index 0798c43..ed71560 100644
> --- a/mgmt/cli/ShowCmd.cc
> +++ b/mgmt/cli/ShowCmd.cc
> @@ -1555,6 +1555,7 @@ ShowLogging()
>    TSInt collation_port = -1;
>    TSString collation_secret = NULL;
>    TSInt host_tag = 0;
> +  TSInt preproc_threads = 0;
>    TSInt orphan_space = -1;
>  
>    TSInt squid_log = 0;
> @@ -1596,6 +1597,7 @@ ShowLogging()
>    Cli_RecordGetString("proxy.config.log.collation_secret",
>    &collation_secret);
>    Cli_RecordGetInt("proxy.config.log.collation_host_tagged", &host_tag);
>    Cli_RecordGetInt("proxy.config.log.max_space_mb_for_orphan_logs",
>    &orphan_space);
> +  Cli_RecordGetInt("proxy.config.log.collation_preproc_threads",
> &preproc_threads);
>  
>    Cli_RecordGetInt("proxy.config.log.squid_log_enabled", &squid_log);
>    Cli_RecordGetInt("proxy.config.log.squid_log_is_ascii", &is_ascii);
> @@ -1657,6 +1659,7 @@ ShowLogging()
>    Cli_Printf("  Port ----------------------------------- %d\n",
>    collation_port);
>    Cli_Printf("  Secret --------------------------------- %s\n",
>    collation_secret);
>    Cli_PrintEnable("  Host Tagged ---------------------------- ", host_tag);
> +  Cli_PrintEnable("  Preproc Threads ------------------------ ",
> preproc_threads);
>    Cli_Printf("  Space Limit for Orphan Files ----------- %d MB\n",
>    orphan_space);
>  
>    Cli_PrintEnable("\nSquid Format ----------------------------- ",
>    squid_log);
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/8a111288/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 94d5625..1361978 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,16 +77,18 @@ size_t Log::numInactiveObjects;
>  size_t Log::maxInactiveObjects;
>  
>  // Flush thread stuff
> -volatile unsigned long Log::flush_counter = 0;

This bit is fascinating: We're replacing the (seemingly)
unused volatile variable flush_counter with the already
existing variable m_bytes_written..

We turn that variable into a volatile, and continue to
use it as if nothing happened. But here's the question:
why was flush_counter not used?

> -ink_mutex Log::flush_mutex;
> -ink_cond Log::flush_cond;
> -ink_thread Log::flush_thread;
> +ink_mutex *Log::preproc_mutex;
> +ink_cond *Log::preproc_cond;
> +ink_mutex *Log::flush_mutex;
> +ink_cond *Log::flush_cond;
> +InkAtomicList *Log::flush_data_list;
>  
>  // Collate thread stuff
>  ink_mutex Log::collate_mutex;
>  ink_cond Log::collate_cond;
>  ink_thread Log::collate_thread;
>  int Log::collation_accept_file_descriptor;
> +int Log::collation_preproc_threads;
>  int Log::collation_port;
>  
>  // Log private objects
> @@ -179,16 +181,28 @@ Log::add_to_inactive(LogObject * object)
>  
>  struct PeriodicWakeup;
>  typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
> -struct PeriodicWakeup : Continuation {
> +struct PeriodicWakeup : Continuation
> +{
> +  int m_preproc_threads;
> +  int m_flush_threads;
> +
>    int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
>    {
> -    ink_cond_signal (&Log::flush_cond);
> -    return EVENT_CONT;
> +      for (int i = 0; i < m_preproc_threads; i++) {
> +        ink_cond_signal (&Log::preproc_cond[i]);
> +      }
> +      for (int i = 0; i < m_flush_threads; i++) {
> +        ink_cond_signal (&Log::flush_cond[i]);
> +      }
> +      return EVENT_CONT;
>    }
>  
> -  PeriodicWakeup () : Continuation (new_ProxyMutex())
> +  PeriodicWakeup (int preproc_threads, int flush_threads) :
> +    Continuation (new_ProxyMutex()),
> +    m_preproc_threads(preproc_threads),
> +    m_flush_threads(flush_threads)
>    {
> -    SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
> +      SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
>    }
>  };
>  
> @@ -286,15 +300,33 @@ Log::periodic_tasks(long time_now)
>  /*-------------------------------------------------------------------------
>    MAIN INTERFACE
>    -------------------------------------------------------------------------*/
> +struct LoggingPreprocContinuation: public Continuation
> +{
> +  int m_idx;
> +
> +  int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
> +  {
> +    Log::preproc_thread_main((void *)&m_idx);
> +    return 0;
> +  }
> +
> +  LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
> +  {
> +    SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
> +  }
> +};
> +
>  struct LoggingFlushContinuation: public Continuation
>  {
> +  int m_idx;
> +
>    int mainEvent(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
>    {
> -    Log::flush_thread_main(NULL);
> +    Log::flush_thread_main((void *)&m_idx);
>      return 0;
>    }
>  
> -  LoggingFlushContinuation():Continuation(NULL)
> +  LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
>    {
>      SET_HANDLER(&LoggingFlushContinuation::mainEvent);
>    }
> @@ -910,6 +942,7 @@ Log::init(int flags)
>    numInactiveObjects = 0;
>    inactive_objects = new LogObject*[maxInactiveObjects];
>  
> +  collation_preproc_threads = 1;
>    collation_accept_file_descriptor = NO_FD;
>  
>    // store the configuration flags
> @@ -931,6 +964,7 @@ Log::init(int flags)
>  
>      config->read_configuration_variables();
>      collation_port = config->collation_port;
> +    collation_preproc_threads = config->collation_preproc_threads;
>  
>      if (config_flags & STANDALONE_COLLATOR) {
>        logging_mode = LOG_TRANSACTIONS_ONLY;
> @@ -959,8 +993,8 @@ Log::init(int flags)
>      create_threads();
>  
>  #ifndef INK_SINGLE_THREADED
> -    eventProcessor.schedule_every(NEW (new PeriodicWakeup()), HRTIME_SECOND,
> -        ET_CALL);
> +    eventProcessor.schedule_every(NEW (new
> PeriodicWakeup(collation_preproc_threads, 1)),
> +                                  HRTIME_SECOND, ET_CALL);
>  #endif
>      init_status |= PERIODIC_WAKEUP_SCHEDULED;
>  
> @@ -1001,9 +1035,16 @@ Log::init_when_enabled()
>      // setup global scrap object
>      //
>      global_scrap_format = NEW(new LogFormat(TEXT_LOG));
> -    global_scrap_object = NEW(new LogObject(global_scrap_format,
> Log::config->logfile_dir, "scrapfile.log", BINARY_LOG,
> -                                            NULL,
> Log::config->rolling_enabled, Log::config->rolling_interval_sec,
> -                                            Log::config->rolling_offset_hr,
> Log::config->rolling_size_mb));
> +    global_scrap_object =
> +      NEW(new LogObject(global_scrap_format,
> +                        Log::config->logfile_dir,
> +                        "scrapfile.log",
> +                        BINARY_LOG, NULL,
> +                        Log::config->rolling_enabled,
> +                        Log::config->collation_preproc_threads,
> +                        Log::config->rolling_interval_sec,
> +                        Log::config->rolling_offset_hr,
> +                        Log::config->rolling_size_mb));
>  
>      // create the flush thread and the collation thread
>      //
> @@ -1030,15 +1071,43 @@ Log::create_threads()
>  
>    REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
>    if (!(init_status & THREADS_CREATED)) {
> -    // start the flush thread
> +
> +    char desc[64];
> +    preproc_mutex = new ink_mutex[collation_preproc_threads];
> +    preproc_cond = new ink_cond[collation_preproc_threads];
> +
> +    size_t stacksize;
> +    REC_ReadConfigInteger(stacksize, 
> "proxy.config.thread.default.stacksize");

This ^ seems like an unnecessary step, that could be pushed into ...

> +
> +    // start the preproc threads
>      //
>      // no need for the conditional var since it will be relying on
>      // on the event system.
> -    ink_mutex_init(&flush_mutex, "Flush thread mutex");
> -    ink_cond_init(&flush_cond);
> -    Continuation *flush_continuation = NEW(new LoggingFlushContinuation);
> -    Event *flush_event = eventProcessor.spawn_thread(flush_continuation,
> "[LOGGING]", stacksize);
> -    flush_thread = flush_event->ethread->tid;
> +    for (int i = 0; i < collation_preproc_threads; i++) {
> +      sprintf(desc, "Logging preproc thread mutex[%d]", i);
> +      ink_mutex_init(&preproc_mutex[i], desc);
> +      ink_cond_init(&preproc_cond[i]);
> +      Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> +      sprintf(desc, "[LOG_PREPROC %d]", i);
> +      eventProcessor.spawn_thread(preproc_cont, desc, stacksize);

spawn_thread() as a default.. i.e.:

eventProcessor::spawn_thread(Continuation, const char * desc, int stacksize -1)
{
  size_t thread_stacksize;
  if stacksize -1 {
    REC_ReadConfigInteger(thread_stacksize, 
"proxy.config.thread.default.stacksize")
  } else {
     thread_stacksize = stacksize
  }

}




>  /*-------------------------------------------------------------------------
> +<<<<<<< HEAD
>    LogFile::write_and_try_delete
> +=======
> +  LogFile::preproc_and_try_delete
> +
> +  preprocess the given buffer data before write to target file
> +  and try to delete it when its reference become zero.
> +>>>>>>> TS-2089: introduce configurable collation preproc threads


To reiterate a point Leif made recently about one of *my* commits:
We should make sure that every single commit actually compiles..

Anyway, that's all I have for now.

-- i 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: [email protected]
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE

Reply via email to