This is an automated email from the ASF dual-hosted git repository. oknet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new 9daeb81 Optimize: tighten the logic of the PluginVC::process_read/write_side() 9daeb81 is described below commit 9daeb81ccc46b62ba340898319a0cadd96b2326c Author: Oknet Xu <xuc...@skyguard.com.cn> AuthorDate: Fri Dec 28 18:07:11 2018 +0800 Optimize: tighten the logic of the PluginVC::process_read/write_side() According to the comment on the head of `process_read/write_side`: - To call the `process_read_side()`, the mutexes must be obtained: - PluginVC::mutex - PluginVC::read_state.vio.mutex - To call the `process_write_side()`, the mutexes must be obtained: - PluginVC::mutex - PluginVC::write_state.vio.mutex But it violates the rules when `process_read/write_side()` is called with `other_side_call == true`. The commit makes the code to be compatible with the rules when `other_side_call` is true. This is an update to TS-3339 (0f9dda6). --- proxy/PluginVC.cc | 87 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index 85dd7fd..44d4076 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -310,6 +310,7 @@ PluginVC::reenable(VIO *vio) { ink_assert(!closed); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); + ink_assert(vio->mutex->thread_holding == this_ethread()); Ptr<ProxyMutex> sm_mutex = vio->mutex; SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread()); @@ -332,6 +333,7 @@ PluginVC::reenable_re(VIO *vio) { ink_assert(!closed); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); + ink_assert(vio->mutex->thread_holding == this_ethread()); Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read"); @@ -473,25 +475,13 @@ PluginVC::process_write_side(bool other_side_call) MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer; + Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE); need_write_process = false; + // Check write_state if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) { return; } - // Acquire the lock of the write side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", core_obj->id, PVC_TYPE); - - need_write_process = true; - setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event); - return; - } - - Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE); - need_write_process = false; // Check the state of our write buffer as well as ntodo int64_t ntodo = write_state.vio.ntodo(); @@ -552,6 +542,30 @@ PluginVC::process_write_side(bool other_side_call) // Wake up the read side on the other side to process these bytes if (!other_side->closed) { if (!other_side_call) { + /* To clear the `need_read_process`, the mutexes must be obtained: + * + * - PluginVC::mutex + * - PluginVC::read_state.vio.mutex + * + */ + if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) { + // Just return, no touch on `other_side->need_read_process`. + return; + } + // Acquire the lock of the read side continuation + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); + + // set need_read_process to enforce the read processing + other_side->need_read_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; + } + other_side->process_read_side(true); } else { other_side->read_state.vio.reenable(); @@ -587,28 +601,11 @@ PluginVC::process_read_side(bool other_side_call) core_reader = core_obj->a_to_p_reader; } - need_read_process = false; - - if (read_state.vio.op != VIO::READ || closed) { - return; - } - // Acquire the lock of the read side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", core_obj->id, PVC_TYPE); - - need_read_process = true; - setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event); - return; - } - Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE); need_read_process = false; - // Check read_state.shutdown after the lock has been obtained. - if (read_state.shutdown) { + // Check read_state + if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) { return; } @@ -668,6 +665,30 @@ PluginVC::process_read_side(bool other_side_call) // intermediate buffer if (!other_side->closed) { if (!other_side_call) { + /* To clear the `need_write_process`, the mutexes must be obtained: + * + * - PluginVC::mutex + * - PluginVC::write_state.vio.mutex + * + */ + if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) { + // Just return, no touch on `other_side->need_write_process`. + return; + } + // Acquire the lock of the write side continuation + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); + + // set need_write_process to enforce the write processing + other_side->need_write_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; + } + other_side->process_write_side(true); } else { other_side->write_state.vio.reenable();