TS-1062: Extends and optimizes FetchSM

* Optimize FetchSM to support stream IO.
* Reduce memory copy in FetchSM.
* Expose some plugin APIs in 'ts/experimental.h'.

This patch will borrow some ideas from @Quehan's fetcher
library, which has not been open-sourced yet.

Signed-off-by: Yunkai Zhang <qiushu....@taobao.com>


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/45f65532
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/45f65532
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/45f65532

Branch: refs/heads/master
Commit: 45f6553290a62aa9e77d42e32e7809b89c41ace2
Parents: 558345f
Author: Yunkai Zhang <qiushu....@taobao.com>
Authored: Mon Mar 3 18:34:54 2014 +0800
Committer: Yunkai Zhang <qiushu....@taobao.com>
Committed: Wed Mar 12 11:02:15 2014 +0800

----------------------------------------------------------------------
 proxy/FetchSM.cc            | 434 +++++++++++++++++++++++++++++++++++----
 proxy/FetchSM.h             |  80 +++++++-
 proxy/InkAPI.cc             |  99 +++++++++
 proxy/api/ts/experimental.h | 119 +++++++++++
 4 files changed, 684 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/FetchSM.cc
----------------------------------------------------------------------
diff --git a/proxy/FetchSM.cc b/proxy/FetchSM.cc
index 89e2f68..16d4f4c 100644
--- a/proxy/FetchSM.cc
+++ b/proxy/FetchSM.cc
@@ -27,20 +27,33 @@
 #include "HTTP.h"
 #include "PluginVC.h"
 
+static const char *http_method[] = {
+  "NONE",
+  "GET",
+  "POST",
+  "CONNECT",
+  "DELETE",
+  "HEAD",
+  "PURGE",
+  "PUT",
+  "LAST",
+};
+
 #define DEBUG_TAG "FetchSM"
+#define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
 
 ClassAllocator < FetchSM > FetchSMAllocator("FetchSMAllocator");
 void
 FetchSM::cleanUp()
 {
   Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
-  free_MIOBuffer(response_buffer);
   free_MIOBuffer(req_buffer);
   free_MIOBuffer(resp_buffer);
   mutex.clear();
   http_parser_clear(&http_parser);
   client_response_hdr.destroy();
   ats_free(client_response);
+  cont_mutex.clear();
 
   PluginVC *vc = (PluginVC *) http_vc;
 
@@ -57,7 +70,7 @@ FetchSM::httpConnect()
   PluginVC *vc = (PluginVC *) http_vc;
 
   read_vio = vc->do_io_read(this, INT64_MAX, resp_buffer);
-  write_vio = vc->do_io_write(this, getReqLen(), req_reader);
+  write_vio = vc->do_io_write(this, getReqLen() + req_content_length, 
req_reader);
 }
 
 char* FetchSM::resp_get(int *length) {
@@ -65,7 +78,8 @@ char* FetchSM::resp_get(int *length) {
   return client_response;
 }
 
-int FetchSM::InvokePlugin(int event, void *data)
+int
+FetchSM::InvokePlugin(int event, void *data)
 {
   EThread *mythread = this_ethread();
 
@@ -77,6 +91,174 @@ int FetchSM::InvokePlugin(int event, void *data)
 
   return ret;
 }
+
+bool
+FetchSM::has_body()
+{
+  int status_code;
+  HTTPHdr *hdr;
+
+  if (!header_done)
+    return false;
+
+  //
+  // The following code comply with HTTP/1.1:
+  // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
+  //
+
+  if (req_method == TS_FETCH_METHOD_HEAD)
+    return false;
+
+  hdr = &client_response_hdr;
+
+  status_code = hdr->status_get();
+  if (status_code < 200 || status_code == 204 || status_code == 304)
+    return false;
+
+  if (check_chunked())
+    return true;
+
+  resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, 
MIME_LEN_CONTENT_LENGTH);
+  if (!resp_content_length)
+    return false;
+
+  return true;
+}
+
+bool
+FetchSM::check_body_done()
+{
+  if (!check_chunked()) {
+    if (resp_content_length == resp_recived_body_len + 
resp_reader->read_avail())
+      return true;
+
+    return false;
+  }
+
+  //
+  // TODO: check whether the chunked body is done
+  //
+  return true;
+}
+
+bool
+FetchSM::check_chunked()
+{
+  int ret;
+  StrList slist;
+  HTTPHdr *hdr = &client_response_hdr;
+
+  if (resp_is_chunked >= 0)
+    return resp_is_chunked;
+
+  ink_release_assert(header_done);
+
+  resp_is_chunked = 0;
+  ret = hdr->value_get_comma_list(MIME_FIELD_TRANSFER_ENCODING,
+                                  MIME_LEN_TRANSFER_ENCODING, &slist);
+  if (ret) {
+    for (Str *f = slist.head; f != NULL; f = f->next) {
+      if (f->len == 0)
+        continue;
+
+      size_t len = sizeof("chunked") - 1;
+      len = len > f->len ? f->len : len;
+      if (!strncasecmp(f->str, "chunked", len)) {
+        resp_is_chunked = 1;
+        return true;
+      }
+    }
+  }
+
+  return resp_is_chunked;
+}
+
+int
+FetchSM::dechunk_body()
+{
+  ink_assert(resp_is_chunked > 0);
+  //
+  // TODO: dechunk the body content.
+  // return:
+  //  - 0: need to read more data.
+  //  - TS_FETCH_EVENT_EXT_BODY_READY.
+  //  - TS_FETCH_EVENT_EXT_BODY_DONE.
+  //
+  return TS_FETCH_EVENT_EXT_BODY_DONE;
+}
+
+void
+FetchSM::InvokePluginExt(int error_event)
+{
+  int event;
+  EThread *mythread = this_ethread();
+
+  //
+  // Increasing *recursion* to prevent
+  // FetchSM being deleted by callback.
+  //
+  recursion++;
+
+  if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
+    MUTEX_TAKE_LOCK(cont_mutex, mythread);
+  }
+
+  if (!contp)
+    goto out;
+
+  if (error_event) {
+    contp->handleEvent(error_event, this);
+    goto out;
+  }
+
+  if (!has_sent_header) {
+    contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
+    has_sent_header = true;
+  }
+
+  if (!has_body()) {
+    contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
+    goto out;
+  }
+
+  if (!resp_reader->read_avail())
+    goto out;
+
+  Debug(DEBUG_TAG, "[%s] chunked:%d, content_len:%ld, recived_len:%ld, 
avail:%ld\n",
+        __FUNCTION__, resp_is_chunked, resp_content_length, 
resp_recived_body_len, resp_reader->read_avail());
+
+  if (!check_chunked()) {
+    if (!check_body_done())
+      contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
+    else
+      contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
+  } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK){
+    event = dechunk_body();
+
+    if (!event) {
+      read_vio->reenable();
+      goto out;
+    }
+
+    contp->handleEvent(event, this);
+  } else if (check_body_done()){
+    contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
+  } else {
+    contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
+  }
+
+out:
+  if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
+    MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
+  }
+  recursion--;
+
+  if (!contp && !recursion)
+    cleanUp();
+
+  return;
+}
+
 void
 FetchSM::get_info_from_buffer(IOBufferReader *the_reader)
 {
@@ -125,57 +307,51 @@ FetchSM::process_fetch_read(int event)
   Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
   int64_t bytes;
   int bytes_used;
-  int64_t total_bytes_copied = 0;
 
   switch (event) {
   case TS_EVENT_VCONN_READ_READY:
     bytes = resp_reader->read_avail();
-    Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64"", 
__FUNCTION__, bytes);
-    while (total_bytes_copied < bytes) {
-       int64_t actual_bytes_copied;
-       actual_bytes_copied = response_buffer->write(resp_reader, bytes, 0);
-       if (actual_bytes_copied <= 0) {
-           break;
-       }
-       total_bytes_copied += actual_bytes_copied;
-    }
-    resp_reader->consume(total_bytes_copied);
-    if (header_done == 0 && callback_options == AFTER_HEADER) {
-      if (client_response_hdr.parse_resp(&http_parser, response_reader, 
&bytes_used, 0) == PARSE_DONE) {
-        //InvokePlugin( TS_EVENT_INTERNAL_60201, (void *) 
&client_response_hdr);
-        InvokePlugin( callback_events.success_event_id, (void *) 
&client_response_hdr);
+    Debug(DEBUG_TAG, "[%s] number of bytes in read ready %"PRId64"", 
__FUNCTION__, bytes);
+    if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || 
callback_options == AFTER_HEADER)) {
+      if (client_response_hdr.parse_resp(&http_parser, resp_reader, 
&bytes_used, 0) == PARSE_DONE) {
         header_done = 1;
+        if (fetch_flags & TS_FETCH_FLAGS_STREAM)
+          return InvokePluginExt();
+        else
+          InvokePlugin( callback_events.success_event_id, (void *) 
&client_response_hdr);
       }
+    } else {
+      if (fetch_flags & TS_FETCH_FLAGS_STREAM)
+        return InvokePluginExt();
+      else
+        InvokePlugin(TS_FETCH_EVENT_EXT_BODY_READY, this);
     }
     read_vio->reenable();
     break;
   case TS_EVENT_VCONN_READ_COMPLETE:
   case TS_EVENT_VCONN_EOS:
+    if (fetch_flags & TS_FETCH_FLAGS_STREAM)
+      return InvokePluginExt();
     if(callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
-    bytes = response_reader->read_avail();
-
-    get_info_from_buffer(response_reader);
-    Debug(DEBUG_TAG, "[%s] number of bytes %" PRId64"", __FUNCTION__, bytes);
-    if(client_response!=NULL)
-      client_response[bytes] = '\0';
-      //client_response[bytes + _headers.size()] = '\0';
-    Debug(DEBUG_TAG, "[%s] Completed data fetch of size %" PRId64", notifying 
caller", __FUNCTION__, bytes);
-    //InvokePlugin( TS_EVENT_INTERNAL_60200, (void *) client_response);
-   client_bytes = bytes;
-    //InvokePlugin( TS_EVENT_INTERNAL_60200, (void *) this);
+      bytes = resp_reader->read_avail();
+      get_info_from_buffer(resp_reader);
+      Debug(DEBUG_TAG, "[%s] number of bytes %"PRId64"", __FUNCTION__, bytes);
+      if(client_response!=NULL)
+        client_response[bytes] = '\0';
+      Debug(DEBUG_TAG, "[%s] Completed data fetch of size %"PRId64", notifying 
caller", __FUNCTION__, bytes);
+      client_bytes = bytes;
       InvokePlugin( callback_events.success_event_id, (void *) this);
     }
-
     Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
     cleanUp();
     break;
   case TS_EVENT_ERROR:
   default:
-    //InvokePlugin(TS_EVENT_ERROR, NULL);
-      InvokePlugin( callback_events.failure_event_id, NULL);
+    if (fetch_flags & TS_FETCH_FLAGS_STREAM)
+      return InvokePluginExt(event);
+    InvokePlugin( callback_events.failure_event_id, NULL);
     cleanUp();
     break;
-
   }
 }
 
@@ -185,18 +361,19 @@ FetchSM::process_fetch_write(int event)
   Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
   switch (event) {
   case TS_EVENT_VCONN_WRITE_COMPLETE:
-    //INKVConnShutdown(http_vc, 0, 1) ; why does not this work???
     req_finished = true;
     break;
   case TS_EVENT_VCONN_WRITE_READY:
     // data is processed in chunks of 32k; if there is more than 32k
     // of input data, we have to continue reenabling until all data is
     // read (we have already written all the data to the buffer)
-    ((PluginVC *) http_vc)->reenable(write_vio);
+    if (req_reader->read_avail() > 0)
+      ((PluginVC *) http_vc)->reenable(write_vio);
     break;
   case TS_EVENT_ERROR:
-    //InvokePlugin( TS_EVENT_ERROR, NULL);
-      InvokePlugin( callback_events.failure_event_id, NULL);
+    if (fetch_flags & TS_FETCH_FLAGS_STREAM)
+      return InvokePluginExt(event);
+    InvokePlugin( callback_events.failure_event_id, NULL);
     cleanUp();
   default:
     break;
@@ -213,8 +390,189 @@ FetchSM::fetch_handler(int event, void *edata)
   } else if (edata == write_vio) {
     process_fetch_write(event);
   } else {
-      InvokePlugin( callback_events.failure_event_id, NULL);
+    if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
+      InvokePluginExt(event);
+      return 1;
+    }
+    InvokePlugin( callback_events.failure_event_id, NULL);
     cleanUp();
   }
   return 1;
 }
+
+void
+FetchSM::ext_init(Continuation *cont, TSFetchMethod method,
+                  const char *url, const char *version,
+                  const sockaddr *client_addr, int flags)
+{
+  init_comm();
+
+  if (flags & TS_FETCH_FLAGS_NEWLOCK) {
+    mutex = new_ProxyMutex();
+    cont_mutex = cont->mutex;
+  } else {
+    mutex = cont->mutex;
+  }
+
+  contp = cont;
+  _addr.assign(client_addr);
+
+  //
+  // Enable stream IO automatically.
+  //
+  fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
+
+  //
+  // These options are not used when enable
+  // stream IO.
+  //
+  memset(&callback_options, 0, sizeof(callback_options));
+  memset(&callback_events, 0, sizeof(callback_events));
+
+  req_method = method;
+  req_buffer->write(http_method[method], strlen(http_method[method]));
+  req_buffer->write(" ", 1);
+  req_buffer->write(url, strlen(url));
+  req_buffer->write(" ", 1);
+  req_buffer->write(version, strlen(version));
+  req_buffer->write("\r\n", 2);
+}
+
+void
+FetchSM::ext_add_header(const char *name, int name_len,
+                        const char *value, int value_len)
+{
+  if (TS_MIME_LEN_CONTENT_LENGTH == name_len &&
+      !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
+    req_content_length = atoll(value);
+  }
+
+  req_buffer->write(name, name_len);
+  req_buffer->write(": ", 2);
+  req_buffer->write(value, value_len);
+  req_buffer->write("\r\n", 2);
+}
+
+void
+FetchSM::ext_lanuch()
+{
+  req_buffer->write("\r\n", 2);
+  httpConnect();
+}
+
+void
+FetchSM::ext_write_data(const void *data, size_t len)
+{
+  if (header_done && (fetch_flags & TS_FETCH_FLAGS_NEWLOCK)) {
+    MUTEX_TAKE_LOCK(mutex, this_ethread());
+  }
+
+  req_buffer->write(data, len);
+
+  //
+  // Before header_done, FetchSM may not
+  // be initialized.
+  //
+  if (header_done)
+    write_vio->reenable();
+
+  if (header_done && (fetch_flags & TS_FETCH_FLAGS_NEWLOCK)) {
+    MUTEX_UNTAKE_LOCK(mutex, this_ethread());
+  }
+}
+
+ssize_t
+FetchSM::ext_read_data(char *buf, size_t len)
+{
+  const char *start;
+  TSIOBufferReader reader;
+  TSIOBufferBlock blk, next_blk;
+  int64_t already, blk_len, need, wavail;
+
+  if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
+    MUTEX_TRY_LOCK(lock, mutex, this_ethread());
+    if (!lock)
+      return 0;
+  }
+
+  if (!header_done)
+    return 0;
+
+  if (check_chunked())
+    reader = NULL; // TODO: asign dechunking reader
+  else
+    reader = (TSIOBufferReader)resp_reader;
+
+  already = 0;
+  blk = TSIOBufferReaderStart(reader);
+
+  while (blk) {
+
+    wavail = len - already;
+
+    next_blk = TSIOBufferBlockNext(blk);
+    start = TSIOBufferBlockReadStart(blk, reader, &blk_len);
+
+    need = blk_len > wavail ? wavail : blk_len;
+
+    memcpy(&buf[already], start, need);
+    already += need;
+
+    if (already >= (int64_t)len)
+      break;
+
+    blk = next_blk;
+  }
+
+  resp_recived_body_len += already;
+  resp_reader->consume(already);
+
+  read_vio->reenable();
+  return already;
+}
+
+void
+FetchSM::ext_destroy()
+{
+  contp = NULL;
+
+  if (recursion)
+    return;
+
+  if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
+    MUTEX_TRY_LOCK(lock, mutex, this_ethread());
+    if (!lock) {
+      eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
+      return;
+    }
+  }
+
+  cleanUp();
+}
+
+void
+FetchSM::ext_set_user_data(void *data)
+{
+  user_data = data;
+}
+
+void*
+FetchSM::ext_get_user_data()
+{
+  return user_data;
+}
+
+TSMBuffer
+FetchSM::resp_hdr_bufp()
+{
+  HdrHeapSDKHandle *heap;
+  heap = (HdrHeapSDKHandle *)&client_response_hdr;
+
+  return (TSMBuffer)heap;
+}
+
+TSMLoc
+FetchSM::resp_hdr_mloc()
+{
+  return (TSMLoc)client_response_hdr.m_http;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/FetchSM.h
----------------------------------------------------------------------
diff --git a/proxy/FetchSM.h b/proxy/FetchSM.h
index 3057fc4..1e312ef 100644
--- a/proxy/FetchSM.h
+++ b/proxy/FetchSM.h
@@ -40,30 +40,56 @@ public:
   FetchSM()
   { }
 
-  void init(Continuation* cont, TSFetchWakeUpOptions options, TSFetchEvent 
events, const char* headers, int length, sockaddr const * addr)
+  void init_comm()
   {
-    //_headers.assign(headers);
-    Debug("FetchSM", "[%s] FetchSM initialized for request with 
headers\n--\n%.*s\n--", __FUNCTION__, length, headers);
+    recursion = 0;
     req_finished = 0;
     resp_finished = 0;
     header_done = 0;
+    user_data = NULL;
+    has_sent_header = false;
+    req_method = TS_FETCH_METHOD_NONE;
+    req_content_length = 0;
+    resp_is_chunked = -1;
+    resp_content_length = -1;
+    resp_recived_body_len = 0;
+    cont_mutex.clear();
     req_buffer = new_MIOBuffer(HTTP_HEADER_BUFFER_SIZE_INDEX);
     req_reader = req_buffer->alloc_reader();
     resp_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
     resp_reader = resp_buffer->alloc_reader();
-    response_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
-    response_reader = response_buffer->alloc_reader();
-    contp = cont;
     http_parser_init(&http_parser);
     client_response_hdr.create(HTTP_TYPE_RESPONSE);
     client_response  = NULL;
-    mutex = new_ProxyMutex();
+    SET_HANDLER(&FetchSM::fetch_handler);
+  }
+
+  void init(Continuation* cont, TSFetchWakeUpOptions options,
+            TSFetchEvent events, const char* headers, int length,
+            sockaddr const *addr)
+  {
+    Debug("FetchSM", "[%s] FetchSM initialized for request with 
headers\n--\n%.*s\n--",
+          __FUNCTION__, length, headers);
+    init_comm();
+    contp = cont;
     callback_events = events;
     callback_options = options;
     _addr.assign(addr);
+    fetch_flags = TS_FETCH_FLAGS_NONE;
     writeRequest(headers,length);
-    SET_HANDLER(&FetchSM::fetch_handler);
+    mutex = new_ProxyMutex();
+
+    //
+    // We had dropped response_buffer/respone_reader to avoid unnecessary
+    // memory copying. But for the original TSFetchURL() API, PluginVC may
+    // stop adding data to resp_buffer when the pending data in resp_buffer
+    // reach its water_mark.
+    //
+    // So we should set the water_mark of resp_buffer with a large value,
+    // INT64_MAX would be reasonable.
+    resp_buffer->water_mark = INT64_MAX;
   }
+
   int fetch_handler(int event, void *data);
   void process_fetch_read(int event);
   void process_fetch_write(int event);
@@ -72,8 +98,29 @@ public:
   void get_info_from_buffer(IOBufferReader *reader);
   char* resp_get(int* length);
 
+  TSMBuffer resp_hdr_bufp();
+  TSMLoc resp_hdr_mloc();
+
+  //
+  // Extended APIs for FetchSM
+  //
+  // *flags* can be bitwise OR of several TSFetchFlags
+  //
+  void ext_init(Continuation *cont, TSFetchMethod method,
+                const char *url, const char *version,
+                const sockaddr *client_addr, int flags);
+  void ext_add_header(const char *name, int name_len,
+                      const char *value, int value_len);
+  void ext_lanuch();
+  void ext_destroy();
+  ssize_t ext_read_data(char *buf, size_t len);
+  void ext_write_data(const void *data, size_t len);
+  void ext_set_user_data(void *data);
+  void* ext_get_user_data();
+
 private:
   int InvokePlugin(int event, void*data);
+  void InvokePluginExt(int error_event = 0);
 
   void writeRequest(const char *headers,int length)
   {
@@ -85,11 +132,15 @@ private:
 
   int64_t getReqLen() const { return req_reader->read_avail(); }
 
+  bool has_body();
+  bool check_body_done();
+  bool check_chunked();
+  int dechunk_body();
+
+  int recursion;
   TSVConn http_vc;
   VIO *read_vio;
   VIO *write_vio;
-  MIOBuffer *response_buffer;   // response to FetchSM call
-  IOBufferReader *response_reader;      // response to FetchSM call
   MIOBuffer *req_buffer;
   IOBufferReader *req_reader;
   char *client_response;
@@ -97,6 +148,7 @@ private:
   MIOBuffer *resp_buffer;       // response to HttpConnect Call
   IOBufferReader *resp_reader;
   Continuation *contp;
+  Ptr<ProxyMutex> cont_mutex;
   HTTPParser http_parser;
   HTTPHdr client_response_hdr;
   TSFetchEvent callback_events;
@@ -105,6 +157,14 @@ private:
   bool header_done;
   bool resp_finished;
   IpEndpoint _addr;
+  int resp_is_chunked;
+  int fetch_flags;
+  void *user_data;
+  bool has_sent_header;
+  TSFetchMethod req_method;
+  int64_t req_content_length;
+  int64_t resp_content_length;
+  int64_t resp_recived_body_len;
 };
 
 #endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/InkAPI.cc
----------------------------------------------------------------------
diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
index 9719990..a0f1c24 100644
--- a/proxy/InkAPI.cc
+++ b/proxy/InkAPI.cc
@@ -585,6 +585,15 @@ sdk_sanity_check_continuation(TSCont cont)
 }
 
 TSReturnCode
+sdk_sanity_check_fetch_sm(TSFetchSM fetch_sm)
+{
+  if (fetch_sm == NULL)
+    return TS_ERROR;
+
+  return TS_SUCCESS;
+}
+
+TSReturnCode
 sdk_sanity_check_http_ssn(TSHttpSsn ssnp)
 {
   if (ssnp == NULL)
@@ -7216,6 +7225,96 @@ TSFetchUrl(const char* headers, int request_len, 
sockaddr const* ip , TSCont con
   fetch_sm->httpConnect();
 }
 
+TSFetchSM
+TSFetchCreate(TSCont contp, TSFetchMethod method,
+              const char *url, const char *version,
+              struct sockaddr const* client_addr, int flags)
+{
+  sdk_assert(sdk_sanity_check_continuation(contp) == TS_SUCCESS);
+  sdk_assert(ats_is_ip4(client_addr));
+
+  FetchSM *fetch_sm = FetchSMAllocator.alloc();
+
+  fetch_sm->ext_init((Continuation*)contp, method, url, version,
+                     client_addr, flags);
+
+  return (TSFetchSM)fetch_sm;
+}
+
+void
+TSFetchHeaderAdd(TSFetchSM fetch_sm,
+                 const char *name, int name_len,
+                 const char *value, int value_len)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  ((FetchSM*)fetch_sm)->ext_add_header(name, name_len, value, value_len);
+}
+
+void
+TSFetchWriteData(TSFetchSM fetch_sm, const void *data, size_t len)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  ((FetchSM*)fetch_sm)->ext_write_data(data, len);
+}
+
+ssize_t
+TSFetchReadData(TSFetchSM fetch_sm, void *buf, size_t len)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  return ((FetchSM*)fetch_sm)->ext_read_data((char *)buf, len);
+}
+
+void
+TSFetchLaunch(TSFetchSM fetch_sm)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  ((FetchSM*)fetch_sm)->ext_lanuch();
+}
+
+void
+TSFetchDestroy(TSFetchSM fetch_sm)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  ((FetchSM*)fetch_sm)->ext_destroy();
+}
+
+void
+TSFetchUserDataSet(TSFetchSM fetch_sm, void *data)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  ((FetchSM*)fetch_sm)->ext_set_user_data(data);
+}
+
+void*
+TSFetchUserDataGet(TSFetchSM fetch_sm)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  return ((FetchSM*)fetch_sm)->ext_get_user_data();
+}
+
+TSMBuffer
+TSFetchRespHdrMBufGet(TSFetchSM fetch_sm)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  return ((FetchSM*)fetch_sm)->resp_hdr_bufp();
+}
+
+TSMLoc
+TSFetchRespHdrMLocGet(TSFetchSM fetch_sm)
+{
+  sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS);
+
+  return ((FetchSM*)fetch_sm)->resp_hdr_mloc();
+}
+
 TSReturnCode
 TSHttpIsInternalRequest(TSHttpTxn txnp)
 {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/api/ts/experimental.h
----------------------------------------------------------------------
diff --git a/proxy/api/ts/experimental.h b/proxy/api/ts/experimental.h
index d926fd8..8621158 100644
--- a/proxy/api/ts/experimental.h
+++ b/proxy/api/ts/experimental.h
@@ -37,6 +37,37 @@ extern "C"
 {
 #endif                          /* __cplusplus */
 
+  /* For Extended FetchSM APIs */
+  typedef enum {
+    TS_FETCH_METHOD_NONE,
+    TS_FETCH_METHOD_GET,
+    TS_FETCH_METHOD_POST,
+    TS_FETCH_METHOD_CONNECT,
+    TS_FETCH_METHOD_DELETE,
+    TS_FETCH_METHOD_HEAD,
+    TS_FETCH_METHOD_PURGE,
+    TS_FETCH_METHOD_PUT,
+    TS_FETCH_METHOD_LAST
+  } TSFetchMethod;
+
+  typedef enum
+  {
+    TS_FETCH_EVENT_EXT_HEAD_READY = -1,
+    TS_FETCH_EVENT_EXT_HEAD_DONE = -2,
+    TS_FETCH_EVENT_EXT_BODY_READY = -3,
+    TS_FETCH_EVENT_EXT_BODY_DONE = -4,
+  } TSFetchEventExt;
+
+  typedef enum
+  {
+    TS_FETCH_FLAGS_NONE = 0,           // do nothing
+    TS_FETCH_FLAGS_STREAM = 1 << 1,    // enable stream IO
+    TS_FETCH_FLAGS_DECHUNK = 1 << 2,   // dechunk body content
+    TS_FETCH_FLAGS_NEWLOCK = 1 << 3,   // allocate new lock for fetch sm
+  } TSFetchFlags;
+
+  typedef struct tsapi_fetchsm* TSFetchSM;
+
   /* Forward declaration of in_addr, any user of these APIs should probably 
      include net/netinet.h or whatever is appropriate on the platform. */
   struct in_addr;
@@ -600,6 +631,94 @@ extern "C"
      return value 0 indicates success */
   tsapi int TSPrefetchHookSet(int hook_no, TSPrefetchHook hook_fn);
 
+
+  /**
+   * Extended FetchSM's AIPs
+   */
+
+  /*
+   * Create FetchSM, this API will enable stream IO automatically.
+   *
+   * @param contp: continuation to be callbacked.
+   * @param method: request method.
+   * @param url: scheme://host[:port]/path.
+   * @param version: client http version, eg: "HTTP/1.1".
+   * @param client_addr: client addr sent to log.
+   * @param flags: can be bitwise OR of several TSFetchFlags.
+   *
+   * return TSFetchSM which should be destroyed by TSFetchDestroy().
+   */
+  tsapi TSFetchSM TSFetchCreate(TSCont contp, TSFetchMethod method,
+                                const char *url, const char *version,
+                                struct sockaddr const* client_addr, int flags);
+
+  /*
+   * Create FetchSM, this API will enable stream IO automatically.
+   *
+   * @param fetch_sm: returned value of TSFetchCreate().
+   * @param name: name of header.
+   * @param name_len: len of name.
+   * @param value: value of header.
+   * @param name_len: len of value.
+   *
+   * return TSFetchSM which should be destroyed by TSFetchDestroy().
+   */
+  tsapi void TSFetchHeaderAdd(TSFetchSM fetch_sm,
+                              const char *name, int name_len,
+                              const char *value, int value_len);
+
+  /*
+   * Write data to FetchSM
+   *
+   * @param fetch_sm: returned value of TSFetchCreate().
+   * @param data/len: data to be written to fetch sm.
+   */
+  tsapi void TSFetchWriteData(TSFetchSM fetch_sm, const void *data, size_t 
len);
+
+  /*
+   * Read up to *len* bytes from FetchSM into *buf*.
+   *
+   * @param fetch_sm: returned value of TSFetchCreate().
+   * @param buf/len: buffer to contain data from fetch sm.
+   */
+  tsapi ssize_t TSFetchReadData(TSFetchSM fetch_sm, void *buf, size_t len);
+
+  /*
+   * Lanuch FetchSM to do http request, before calling this API,
+   * you should append http request header into fetch sm through
+   * TSFetchWriteData() API
+   *
+   * @param fetch_sm: comes from returned value of TSFetchCreate().
+   */
+  tsapi void TSFetchLaunch(TSFetchSM fetch_sm);
+
+  /*
+   * Destroy FetchSM
+   *
+   * @param fetch_sm: returned value of TSFetchCreate().
+   */
+  tsapi void TSFetchDestroy(TSFetchSM fetch_sm);
+
+  /*
+   * Set user-defined data in FetchSM
+   */
+  tsapi void TSFetchUserDataSet(TSFetchSM fetch_sm, void *data);
+
+  /*
+   * Get user-defined data in FetchSM
+   */
+  tsapi void* TSFetchUserDataGet(TSFetchSM fetch_sm);
+
+  /*
+   * Get client response hdr mbuffer
+   */
+  tsapi TSMBuffer TSFetchRespHdrMBufGet(TSFetchSM fetch_sm);
+
+  /*
+   * Get client response hdr mloc
+   */
+  tsapi TSMLoc TSFetchRespHdrMLocGet(TSFetchSM fetch_sm);
+
 #ifdef __cplusplus
 }
 #endif                          /* __cplusplus */

Reply via email to