This is an automated email from the ASF dual-hosted git repository.

oknet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 9daeb81  Optimize: tighten the logic of the 
PluginVC::process_read/write_side()
9daeb81 is described below

commit 9daeb81ccc46b62ba340898319a0cadd96b2326c
Author: Oknet Xu <xuc...@skyguard.com.cn>
AuthorDate: Fri Dec 28 18:07:11 2018 +0800

    Optimize: tighten the logic of the PluginVC::process_read/write_side()
    
    According to the comment on the head of `process_read/write_side`:
    
    - To call the `process_read_side()`, the mutexes must be obtained:
      - PluginVC::mutex
      - PluginVC::read_state.vio.mutex
    - To call the `process_write_side()`, the mutexes must be obtained:
      - PluginVC::mutex
      - PluginVC::write_state.vio.mutex
    
    But it violates the rules when `process_read/write_side()` is called
    with `other_side_call == true`.
    
    The commit makes the code to be compatible with the rules when
    `other_side_call` is true.
    
    This is an update to TS-3339 (0f9dda6).
---
 proxy/PluginVC.cc | 87 ++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 54 insertions(+), 33 deletions(-)

diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index 85dd7fd..44d4076 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -310,6 +310,7 @@ PluginVC::reenable(VIO *vio)
 {
   ink_assert(!closed);
   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
+  ink_assert(vio->mutex->thread_holding == this_ethread());
 
   Ptr<ProxyMutex> sm_mutex = vio->mutex;
   SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread());
@@ -332,6 +333,7 @@ PluginVC::reenable_re(VIO *vio)
 {
   ink_assert(!closed);
   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
+  ink_assert(vio->mutex->thread_holding == this_ethread());
 
   Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == 
VIO::WRITE) ? "Write" : "Read");
 
@@ -473,25 +475,13 @@ PluginVC::process_write_side(bool other_side_call)
 
   MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? 
core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
 
+  Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
   need_write_process = false;
 
+  // Check write_state
   if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
     return;
   }
-  // Acquire the lock of the write side continuation
-  EThread *my_ethread = mutex->thread_holding;
-  ink_assert(my_ethread != nullptr);
-  MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
-  if (!lock.is_locked()) {
-    Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", 
core_obj->id, PVC_TYPE);
-
-    need_write_process = true;
-    setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
-    return;
-  }
-
-  Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
-  need_write_process = false;
 
   // Check the state of our write buffer as well as ntodo
   int64_t ntodo = write_state.vio.ntodo();
@@ -552,6 +542,30 @@ PluginVC::process_write_side(bool other_side_call)
   // Wake up the read side on the other side to process these bytes
   if (!other_side->closed) {
     if (!other_side_call) {
+      /* To clear the `need_read_process`, the mutexes must be obtained:
+       *
+       *   - PluginVC::mutex
+       *   - PluginVC::read_state.vio.mutex
+       *
+       */
+      if (other_side->read_state.vio.op != VIO::READ || other_side->closed || 
other_side->read_state.shutdown) {
+        // Just return, no touch on `other_side->need_read_process`.
+        return;
+      }
+      // Acquire the lock of the read side continuation
+      EThread *my_ethread = mutex->thread_holding;
+      ink_assert(my_ethread != nullptr);
+      MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
+      if (!lock.is_locked()) {
+        Debug("pvc_event", "[%u] %s: process_read_side from other side lock 
miss, retrying", other_side->core_obj->id,
+              ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : 
"Passive"));
+
+        // set need_read_process to enforce the read processing
+        other_side->need_read_process = true;
+        other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, 
&other_side->core_lock_retry_event);
+        return;
+      }
+
       other_side->process_read_side(true);
     } else {
       other_side->read_state.vio.reenable();
@@ -587,28 +601,11 @@ PluginVC::process_read_side(bool other_side_call)
     core_reader = core_obj->a_to_p_reader;
   }
 
-  need_read_process = false;
-
-  if (read_state.vio.op != VIO::READ || closed) {
-    return;
-  }
-  // Acquire the lock of the read side continuation
-  EThread *my_ethread = mutex->thread_holding;
-  ink_assert(my_ethread != nullptr);
-  MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
-  if (!lock.is_locked()) {
-    Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", 
core_obj->id, PVC_TYPE);
-
-    need_read_process = true;
-    setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
-    return;
-  }
-
   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
   need_read_process = false;
 
-  // Check read_state.shutdown after the lock has been obtained.
-  if (read_state.shutdown) {
+  // Check read_state
+  if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
     return;
   }
 
@@ -668,6 +665,30 @@ PluginVC::process_read_side(bool other_side_call)
   //  intermediate buffer
   if (!other_side->closed) {
     if (!other_side_call) {
+      /* To clear the `need_write_process`, the mutexes must be obtained:
+       *
+       *   - PluginVC::mutex
+       *   - PluginVC::write_state.vio.mutex
+       *
+       */
+      if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed 
|| other_side->write_state.shutdown) {
+        // Just return, no touch on `other_side->need_write_process`.
+        return;
+      }
+      // Acquire the lock of the write side continuation
+      EThread *my_ethread = mutex->thread_holding;
+      ink_assert(my_ethread != nullptr);
+      MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
+      if (!lock.is_locked()) {
+        Debug("pvc_event", "[%u] %s: process_write_side from other side lock 
miss, retrying", other_side->core_obj->id,
+              ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : 
"Passive"));
+
+        // set need_write_process to enforce the write processing
+        other_side->need_write_process = true;
+        other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, 
&other_side->core_lock_retry_event);
+        return;
+      }
+
       other_side->process_write_side(true);
     } else {
       other_side->write_state.vio.reenable();

Reply via email to