Repository: trafficserver Updated Branches: refs/heads/master 8ed6e6f52 -> c58648bab
atscppapi: fixing deadlock and shut down race conditions in intercept Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c58648ba Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c58648ba Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c58648ba Branch: refs/heads/master Commit: c58648baba56bc972e47311e35a7656f2dcb275d Parents: 8ed6e6f Author: Manjesh Nilange <manjeshnila...@yahoo.com> Authored: Tue Aug 19 13:29:11 2014 -0700 Committer: Manjesh Nilange <manjeshnila...@yahoo.com> Committed: Tue Aug 19 13:29:11 2014 -0700 ---------------------------------------------------------------------- lib/atscppapi/src/InterceptPlugin.cc | 104 +++++++++++++++++------------- 1 file changed, 59 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c58648ba/lib/atscppapi/src/InterceptPlugin.cc ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/InterceptPlugin.cc b/lib/atscppapi/src/InterceptPlugin.cc index 0e7ca13..0ae841e 100644 --- a/lib/atscppapi/src/InterceptPlugin.cc +++ b/lib/atscppapi/src/InterceptPlugin.cc @@ -37,17 +37,6 @@ using namespace atscppapi; using std::string; -namespace { - -/** This contains info for the continuation callback to invoke the plugin */ -struct PluginHandle { - InterceptPlugin *plugin_; - shared_ptr<Mutex> mutex_; - PluginHandle(InterceptPlugin *plugin, shared_ptr<Mutex> mutex) : plugin_(plugin), mutex_(mutex) { } -}; - -} - /** * @private */ @@ -83,13 +72,20 @@ struct InterceptPlugin::State { TSMBuffer hdr_buf_; TSMLoc hdr_loc_; int num_bytes_written_; - PluginHandle *plugin_handle_; - bool shut_down_; + shared_ptr<Mutex> plugin_mutex_; + InterceptPlugin *plugin_; Headers request_headers_; - State(TSCont cont) : cont_(cont), net_vc_(NULL), expected_body_size_(0), num_body_bytes_read_(0), - hdr_parsed_(false), hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0), - plugin_handle_(NULL), shut_down_(false) { + /** these two fields to be used by the continuation callback only */ + TSEvent saved_event_; + void *saved_edata_; + + TSAction timeout_action_; + + State(TSCont cont, InterceptPlugin *plugin) + : cont_(cont), net_vc_(NULL), expected_body_size_(0), num_body_bytes_read_(0), hdr_parsed_(false), + hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0), plugin_(plugin), timeout_action_(NULL) { + plugin_mutex_ = plugin->getMutex(); http_parser_ = TSHttpParserCreate(); } @@ -107,15 +103,15 @@ struct InterceptPlugin::State { namespace { int handleEvents(TSCont cont, TSEvent event, void *edata); +void destroyCont(InterceptPlugin::State *state); } InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type type) : TransactionPlugin(transaction) { TSCont cont = TSContCreate(handleEvents, TSMutexCreate()); - state_ = new State(cont); - state_->plugin_handle_ = new PluginHandle(this, TransactionPlugin::getMutex()); - TSContDataSet(cont, state_->plugin_handle_); + state_ = new State(cont, this); + TSContDataSet(cont, state_); TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle()); if (type == SERVER_INTERCEPT) { TSHttpTxnServerIntercept(cont, txn); @@ -126,22 +122,18 @@ InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type } InterceptPlugin::~InterceptPlugin() { - if (!state_->shut_down_) { - // transaction is closing, but intercept hasn't finished. Indicate - // that plugin is dead (cleanup will be done by continuation - // callback) - state_->plugin_handle_->plugin_ = NULL; + if (state_->cont_) { + LOG_DEBUG("Relying on callback for cleanup"); + state_->plugin_ = NULL; // prevent callback from invoking plugin } - else { - TSContDestroy(state_->cont_); - delete state_->plugin_handle_; + else { // safe to cleanup + LOG_DEBUG("Normal shutdown"); + delete state_; } - delete state_; } bool InterceptPlugin::produce(const void *data, int data_size) { - ScopedSharedMutexLock scopedLock(getMutex()); - if (!state_->net_vc_ || state_->shut_down_) { + if (!state_->net_vc_) { LOG_ERROR("Intercept not operational"); return false; } @@ -164,7 +156,7 @@ bool InterceptPlugin::produce(const void *data, int data_size) { bool InterceptPlugin::setOutputComplete() { ScopedSharedMutexLock scopedLock(getMutex()); - if (!state_->net_vc_ || state_->shut_down_) { + if (!state_->net_vc_) { LOG_ERROR("Intercept not operational"); return false; } @@ -303,11 +295,9 @@ void InterceptPlugin::handleEvent(int abstract_event, void *edata) { else if (event == TS_EVENT_NET_ACCEPT_FAILED) { LOG_ERROR("Got net_accept_failed!"); } - LOG_DEBUG("Shutting down"); - if (state_->net_vc_) { - TSVConnClose(state_->net_vc_); - } - state_->shut_down_ = true; + LOG_DEBUG("Shutting down intercept"); + destroyCont(state_); + state_->cont_ = NULL; break; default: @@ -318,18 +308,42 @@ void InterceptPlugin::handleEvent(int abstract_event, void *edata) { namespace { int handleEvents(TSCont cont, TSEvent event, void *edata) { - PluginHandle *plugin_handle = static_cast<PluginHandle *>(TSContDataGet(cont)); - ScopedSharedMutexLock scopedSharedMutexLock(plugin_handle->mutex_); - if (plugin_handle->plugin_) { - utils::internal::dispatchInterceptEvent(plugin_handle->plugin_, event, edata); + InterceptPlugin::State *state = static_cast<InterceptPlugin::State *>(TSContDataGet(cont)); + ScopedSharedMutexTryLock scopedTryLock(state->plugin_mutex_); + if (!scopedTryLock.hasLock()) { + if (event != TS_EVENT_TIMEOUT) { // save only "non-retry" info + state->saved_event_ = event; + state->saved_edata_ = edata; + } + state->timeout_action_ = TSContSchedule(cont, 1, TS_THREAD_POOL_DEFAULT); + return 0; } - else { - // plugin is dead; cleanup - LOG_ERROR("Received event %d after plugin died!", event); - TSContDestroy(cont); - delete plugin_handle; + if (state->plugin_) { + if (event == TS_EVENT_TIMEOUT) { // restore original event + event = state->saved_event_; + edata = state->saved_edata_; + } + utils::internal::dispatchInterceptEvent(state->plugin_, event, edata); + } + else { // plugin was destroyed before intercept was completed; cleaning up here + LOG_DEBUG("Cleaning up as intercept plugin is already destroyed"); + destroyCont(state); + delete state; } return 0; } +void destroyCont(InterceptPlugin::State *state) { + if (state->net_vc_) { + TSVConnShutdown(state->net_vc_, 1, 1); + TSVConnClose(state->net_vc_); + state->net_vc_ = NULL; + } + if (state->timeout_action_) { + TSActionCancel(state->timeout_action_); + state->timeout_action_ = NULL; + } + TSContDestroy(state->cont_); +} + }