Repository: trafficserver
Updated Branches:
  refs/heads/master 8ed6e6f52 -> c58648bab


atscppapi: fixing deadlock and shut down race conditions in intercept


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c58648ba
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c58648ba
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c58648ba

Branch: refs/heads/master
Commit: c58648baba56bc972e47311e35a7656f2dcb275d
Parents: 8ed6e6f
Author: Manjesh Nilange <manjeshnila...@yahoo.com>
Authored: Tue Aug 19 13:29:11 2014 -0700
Committer: Manjesh Nilange <manjeshnila...@yahoo.com>
Committed: Tue Aug 19 13:29:11 2014 -0700

----------------------------------------------------------------------
 lib/atscppapi/src/InterceptPlugin.cc | 104 +++++++++++++++++-------------
 1 file changed, 59 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c58648ba/lib/atscppapi/src/InterceptPlugin.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/InterceptPlugin.cc 
b/lib/atscppapi/src/InterceptPlugin.cc
index 0e7ca13..0ae841e 100644
--- a/lib/atscppapi/src/InterceptPlugin.cc
+++ b/lib/atscppapi/src/InterceptPlugin.cc
@@ -37,17 +37,6 @@
 using namespace atscppapi;
 using std::string;
 
-namespace {
-
-/** This contains info for the continuation callback to invoke the plugin */
-struct PluginHandle {
-  InterceptPlugin *plugin_;
-  shared_ptr<Mutex> mutex_;
-  PluginHandle(InterceptPlugin *plugin, shared_ptr<Mutex> mutex) : 
plugin_(plugin), mutex_(mutex) { }
-};
-
-}
-
 /**
  * @private
  */
@@ -83,13 +72,20 @@ struct InterceptPlugin::State {
   TSMBuffer hdr_buf_;
   TSMLoc hdr_loc_;
   int num_bytes_written_;
-  PluginHandle *plugin_handle_;
-  bool shut_down_;
+  shared_ptr<Mutex> plugin_mutex_;
+  InterceptPlugin *plugin_;
   Headers request_headers_;
 
-  State(TSCont cont) : cont_(cont), net_vc_(NULL), expected_body_size_(0), 
num_body_bytes_read_(0),
-                       hdr_parsed_(false), hdr_buf_(NULL), hdr_loc_(NULL), 
num_bytes_written_(0), 
-                       plugin_handle_(NULL), shut_down_(false) {
+  /** these two fields to be used by the continuation callback only */
+  TSEvent saved_event_;
+  void *saved_edata_;
+
+  TSAction timeout_action_;
+
+  State(TSCont cont, InterceptPlugin *plugin)
+    : cont_(cont), net_vc_(NULL), expected_body_size_(0), 
num_body_bytes_read_(0), hdr_parsed_(false),
+      hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0), plugin_(plugin), 
timeout_action_(NULL) {
+    plugin_mutex_ = plugin->getMutex();
     http_parser_ = TSHttpParserCreate();
   }
   
@@ -107,15 +103,15 @@ struct InterceptPlugin::State {
 namespace {
 
 int handleEvents(TSCont cont, TSEvent event, void *edata);
+void destroyCont(InterceptPlugin::State *state);
 
 }
 
 InterceptPlugin::InterceptPlugin(Transaction &transaction, 
InterceptPlugin::Type type)
   : TransactionPlugin(transaction) {
   TSCont cont = TSContCreate(handleEvents, TSMutexCreate());
-  state_ = new State(cont);
-  state_->plugin_handle_ = new PluginHandle(this, 
TransactionPlugin::getMutex());
-  TSContDataSet(cont, state_->plugin_handle_);
+  state_ = new State(cont, this);
+  TSContDataSet(cont, state_);
   TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle());
   if (type == SERVER_INTERCEPT) {
     TSHttpTxnServerIntercept(cont, txn);
@@ -126,22 +122,18 @@ InterceptPlugin::InterceptPlugin(Transaction 
&transaction, InterceptPlugin::Type
 }
 
 InterceptPlugin::~InterceptPlugin() {
-  if (!state_->shut_down_) {
-    // transaction is closing, but intercept hasn't finished. Indicate
-    // that plugin is dead (cleanup will be done by continuation
-    // callback)
-    state_->plugin_handle_->plugin_ = NULL;
+  if (state_->cont_) {
+    LOG_DEBUG("Relying on callback for cleanup");
+    state_->plugin_ = NULL; // prevent callback from invoking plugin
   }
-  else {
-    TSContDestroy(state_->cont_);
-    delete state_->plugin_handle_;
+  else { // safe to cleanup
+    LOG_DEBUG("Normal shutdown");
+    delete state_;
   }
-  delete state_;
 }
 
 bool InterceptPlugin::produce(const void *data, int data_size) {
-  ScopedSharedMutexLock scopedLock(getMutex());
-  if (!state_->net_vc_ || state_->shut_down_) {
+  if (!state_->net_vc_) {
     LOG_ERROR("Intercept not operational");
     return false;
   }
@@ -164,7 +156,7 @@ bool InterceptPlugin::produce(const void *data, int 
data_size) {
 
 bool InterceptPlugin::setOutputComplete() {
   ScopedSharedMutexLock scopedLock(getMutex());
-  if (!state_->net_vc_ || state_->shut_down_) {
+  if (!state_->net_vc_) {
     LOG_ERROR("Intercept not operational");
     return false;
   }
@@ -303,11 +295,9 @@ void InterceptPlugin::handleEvent(int abstract_event, void 
*edata) {
     else if (event == TS_EVENT_NET_ACCEPT_FAILED) {
       LOG_ERROR("Got net_accept_failed!");
     }
-    LOG_DEBUG("Shutting down");
-    if (state_->net_vc_) {
-      TSVConnClose(state_->net_vc_);
-    }
-    state_->shut_down_ = true;
+    LOG_DEBUG("Shutting down intercept");
+    destroyCont(state_);
+    state_->cont_ = NULL;
     break;
 
   default:
@@ -318,18 +308,42 @@ void InterceptPlugin::handleEvent(int abstract_event, 
void *edata) {
 namespace {
 
 int handleEvents(TSCont cont, TSEvent event, void *edata) {
-  PluginHandle *plugin_handle = static_cast<PluginHandle 
*>(TSContDataGet(cont));
-  ScopedSharedMutexLock scopedSharedMutexLock(plugin_handle->mutex_);
-  if (plugin_handle->plugin_) {
-    utils::internal::dispatchInterceptEvent(plugin_handle->plugin_, event, 
edata);
+  InterceptPlugin::State *state = static_cast<InterceptPlugin::State 
*>(TSContDataGet(cont));
+  ScopedSharedMutexTryLock scopedTryLock(state->plugin_mutex_);
+  if (!scopedTryLock.hasLock()) {
+    if (event != TS_EVENT_TIMEOUT) { // save only "non-retry" info
+      state->saved_event_ = event;
+      state->saved_edata_ = edata;
+    }
+    state->timeout_action_ = TSContSchedule(cont, 1, TS_THREAD_POOL_DEFAULT);
+    return 0;
   }
-  else {
-    // plugin is dead; cleanup
-    LOG_ERROR("Received event %d after plugin died!", event);
-    TSContDestroy(cont);
-    delete plugin_handle;
+  if (state->plugin_) {
+    if (event == TS_EVENT_TIMEOUT) { // restore original event
+      event = state->saved_event_;
+      edata = state->saved_edata_;
+    }
+    utils::internal::dispatchInterceptEvent(state->plugin_, event, edata);
+  }
+  else { // plugin was destroyed before intercept was completed; cleaning up 
here
+    LOG_DEBUG("Cleaning up as intercept plugin is already destroyed");
+    destroyCont(state);
+    delete state;
   }
   return 0;
 }
 
+void destroyCont(InterceptPlugin::State *state) {
+  if (state->net_vc_) {
+    TSVConnShutdown(state->net_vc_, 1, 1);
+    TSVConnClose(state->net_vc_);
+    state->net_vc_ = NULL;
+  }
+  if (state->timeout_action_) {
+    TSActionCancel(state->timeout_action_);
+    state->timeout_action_ = NULL;
+  }
+  TSContDestroy(state->cont_);
+}
+
 }

Reply via email to