TS-1062: Extends and optimizes FetchSM * Optimize FetchSM to support stream IO. * Reduce memory copy in FetchSM. * Expose some plugin APIs in 'ts/experimental.h'.
This patch will borrow some ideas from @Quehan's fetcher library, which has not been open-sourced yet. Signed-off-by: Yunkai Zhang <qiushu....@taobao.com> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/45f65532 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/45f65532 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/45f65532 Branch: refs/heads/master Commit: 45f6553290a62aa9e77d42e32e7809b89c41ace2 Parents: 558345f Author: Yunkai Zhang <qiushu....@taobao.com> Authored: Mon Mar 3 18:34:54 2014 +0800 Committer: Yunkai Zhang <qiushu....@taobao.com> Committed: Wed Mar 12 11:02:15 2014 +0800 ---------------------------------------------------------------------- proxy/FetchSM.cc | 434 +++++++++++++++++++++++++++++++++++---- proxy/FetchSM.h | 80 +++++++- proxy/InkAPI.cc | 99 +++++++++ proxy/api/ts/experimental.h | 119 +++++++++++ 4 files changed, 684 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/FetchSM.cc ---------------------------------------------------------------------- diff --git a/proxy/FetchSM.cc b/proxy/FetchSM.cc index 89e2f68..16d4f4c 100644 --- a/proxy/FetchSM.cc +++ b/proxy/FetchSM.cc @@ -27,20 +27,33 @@ #include "HTTP.h" #include "PluginVC.h" +static const char *http_method[] = { + "NONE", + "GET", + "POST", + "CONNECT", + "DELETE", + "HEAD", + "PURGE", + "PUT", + "LAST", +}; + #define DEBUG_TAG "FetchSM" +#define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10) ClassAllocator < FetchSM > FetchSMAllocator("FetchSMAllocator"); void FetchSM::cleanUp() { Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__); - free_MIOBuffer(response_buffer); free_MIOBuffer(req_buffer); free_MIOBuffer(resp_buffer); mutex.clear(); http_parser_clear(&http_parser); client_response_hdr.destroy(); ats_free(client_response); + cont_mutex.clear(); PluginVC *vc = (PluginVC *) http_vc; @@ -57,7 +70,7 @@ FetchSM::httpConnect() PluginVC *vc = (PluginVC *) http_vc; read_vio = vc->do_io_read(this, INT64_MAX, resp_buffer); - write_vio = vc->do_io_write(this, getReqLen(), req_reader); + write_vio = vc->do_io_write(this, getReqLen() + req_content_length, req_reader); } char* FetchSM::resp_get(int *length) { @@ -65,7 +78,8 @@ char* FetchSM::resp_get(int *length) { return client_response; } -int FetchSM::InvokePlugin(int event, void *data) +int +FetchSM::InvokePlugin(int event, void *data) { EThread *mythread = this_ethread(); @@ -77,6 +91,174 @@ int FetchSM::InvokePlugin(int event, void *data) return ret; } + +bool +FetchSM::has_body() +{ + int status_code; + HTTPHdr *hdr; + + if (!header_done) + return false; + + // + // The following code comply with HTTP/1.1: + // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4 + // + + if (req_method == TS_FETCH_METHOD_HEAD) + return false; + + hdr = &client_response_hdr; + + status_code = hdr->status_get(); + if (status_code < 200 || status_code == 204 || status_code == 304) + return false; + + if (check_chunked()) + return true; + + resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH); + if (!resp_content_length) + return false; + + return true; +} + +bool +FetchSM::check_body_done() +{ + if (!check_chunked()) { + if (resp_content_length == resp_recived_body_len + resp_reader->read_avail()) + return true; + + return false; + } + + // + // TODO: check whether the chunked body is done + // + return true; +} + +bool +FetchSM::check_chunked() +{ + int ret; + StrList slist; + HTTPHdr *hdr = &client_response_hdr; + + if (resp_is_chunked >= 0) + return resp_is_chunked; + + ink_release_assert(header_done); + + resp_is_chunked = 0; + ret = hdr->value_get_comma_list(MIME_FIELD_TRANSFER_ENCODING, + MIME_LEN_TRANSFER_ENCODING, &slist); + if (ret) { + for (Str *f = slist.head; f != NULL; f = f->next) { + if (f->len == 0) + continue; + + size_t len = sizeof("chunked") - 1; + len = len > f->len ? f->len : len; + if (!strncasecmp(f->str, "chunked", len)) { + resp_is_chunked = 1; + return true; + } + } + } + + return resp_is_chunked; +} + +int +FetchSM::dechunk_body() +{ + ink_assert(resp_is_chunked > 0); + // + // TODO: dechunk the body content. + // return: + // - 0: need to read more data. + // - TS_FETCH_EVENT_EXT_BODY_READY. + // - TS_FETCH_EVENT_EXT_BODY_DONE. + // + return TS_FETCH_EVENT_EXT_BODY_DONE; +} + +void +FetchSM::InvokePluginExt(int error_event) +{ + int event; + EThread *mythread = this_ethread(); + + // + // Increasing *recursion* to prevent + // FetchSM being deleted by callback. + // + recursion++; + + if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { + MUTEX_TAKE_LOCK(cont_mutex, mythread); + } + + if (!contp) + goto out; + + if (error_event) { + contp->handleEvent(error_event, this); + goto out; + } + + if (!has_sent_header) { + contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this); + has_sent_header = true; + } + + if (!has_body()) { + contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); + goto out; + } + + if (!resp_reader->read_avail()) + goto out; + + Debug(DEBUG_TAG, "[%s] chunked:%d, content_len:%ld, recived_len:%ld, avail:%ld\n", + __FUNCTION__, resp_is_chunked, resp_content_length, resp_recived_body_len, resp_reader->read_avail()); + + if (!check_chunked()) { + if (!check_body_done()) + contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this); + else + contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); + } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK){ + event = dechunk_body(); + + if (!event) { + read_vio->reenable(); + goto out; + } + + contp->handleEvent(event, this); + } else if (check_body_done()){ + contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this); + } else { + contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this); + } + +out: + if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { + MUTEX_UNTAKE_LOCK(cont_mutex, mythread); + } + recursion--; + + if (!contp && !recursion) + cleanUp(); + + return; +} + void FetchSM::get_info_from_buffer(IOBufferReader *the_reader) { @@ -125,57 +307,51 @@ FetchSM::process_fetch_read(int event) Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__); int64_t bytes; int bytes_used; - int64_t total_bytes_copied = 0; switch (event) { case TS_EVENT_VCONN_READ_READY: bytes = resp_reader->read_avail(); - Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64"", __FUNCTION__, bytes); - while (total_bytes_copied < bytes) { - int64_t actual_bytes_copied; - actual_bytes_copied = response_buffer->write(resp_reader, bytes, 0); - if (actual_bytes_copied <= 0) { - break; - } - total_bytes_copied += actual_bytes_copied; - } - resp_reader->consume(total_bytes_copied); - if (header_done == 0 && callback_options == AFTER_HEADER) { - if (client_response_hdr.parse_resp(&http_parser, response_reader, &bytes_used, 0) == PARSE_DONE) { - //InvokePlugin( TS_EVENT_INTERNAL_60201, (void *) &client_response_hdr); - InvokePlugin( callback_events.success_event_id, (void *) &client_response_hdr); + Debug(DEBUG_TAG, "[%s] number of bytes in read ready %"PRId64"", __FUNCTION__, bytes); + if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) { + if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, 0) == PARSE_DONE) { header_done = 1; + if (fetch_flags & TS_FETCH_FLAGS_STREAM) + return InvokePluginExt(); + else + InvokePlugin( callback_events.success_event_id, (void *) &client_response_hdr); } + } else { + if (fetch_flags & TS_FETCH_FLAGS_STREAM) + return InvokePluginExt(); + else + InvokePlugin(TS_FETCH_EVENT_EXT_BODY_READY, this); } read_vio->reenable(); break; case TS_EVENT_VCONN_READ_COMPLETE: case TS_EVENT_VCONN_EOS: + if (fetch_flags & TS_FETCH_FLAGS_STREAM) + return InvokePluginExt(); if(callback_options == AFTER_HEADER || callback_options == AFTER_BODY) { - bytes = response_reader->read_avail(); - - get_info_from_buffer(response_reader); - Debug(DEBUG_TAG, "[%s] number of bytes %" PRId64"", __FUNCTION__, bytes); - if(client_response!=NULL) - client_response[bytes] = '\0'; - //client_response[bytes + _headers.size()] = '\0'; - Debug(DEBUG_TAG, "[%s] Completed data fetch of size %" PRId64", notifying caller", __FUNCTION__, bytes); - //InvokePlugin( TS_EVENT_INTERNAL_60200, (void *) client_response); - client_bytes = bytes; - //InvokePlugin( TS_EVENT_INTERNAL_60200, (void *) this); + bytes = resp_reader->read_avail(); + get_info_from_buffer(resp_reader); + Debug(DEBUG_TAG, "[%s] number of bytes %"PRId64"", __FUNCTION__, bytes); + if(client_response!=NULL) + client_response[bytes] = '\0'; + Debug(DEBUG_TAG, "[%s] Completed data fetch of size %"PRId64", notifying caller", __FUNCTION__, bytes); + client_bytes = bytes; InvokePlugin( callback_events.success_event_id, (void *) this); } - Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__); cleanUp(); break; case TS_EVENT_ERROR: default: - //InvokePlugin(TS_EVENT_ERROR, NULL); - InvokePlugin( callback_events.failure_event_id, NULL); + if (fetch_flags & TS_FETCH_FLAGS_STREAM) + return InvokePluginExt(event); + InvokePlugin( callback_events.failure_event_id, NULL); cleanUp(); break; - } } @@ -185,18 +361,19 @@ FetchSM::process_fetch_write(int event) Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__); switch (event) { case TS_EVENT_VCONN_WRITE_COMPLETE: - //INKVConnShutdown(http_vc, 0, 1) ; why does not this work??? req_finished = true; break; case TS_EVENT_VCONN_WRITE_READY: // data is processed in chunks of 32k; if there is more than 32k // of input data, we have to continue reenabling until all data is // read (we have already written all the data to the buffer) - ((PluginVC *) http_vc)->reenable(write_vio); + if (req_reader->read_avail() > 0) + ((PluginVC *) http_vc)->reenable(write_vio); break; case TS_EVENT_ERROR: - //InvokePlugin( TS_EVENT_ERROR, NULL); - InvokePlugin( callback_events.failure_event_id, NULL); + if (fetch_flags & TS_FETCH_FLAGS_STREAM) + return InvokePluginExt(event); + InvokePlugin( callback_events.failure_event_id, NULL); cleanUp(); default: break; @@ -213,8 +390,189 @@ FetchSM::fetch_handler(int event, void *edata) } else if (edata == write_vio) { process_fetch_write(event); } else { - InvokePlugin( callback_events.failure_event_id, NULL); + if (fetch_flags & TS_FETCH_FLAGS_STREAM) { + InvokePluginExt(event); + return 1; + } + InvokePlugin( callback_events.failure_event_id, NULL); cleanUp(); } return 1; } + +void +FetchSM::ext_init(Continuation *cont, TSFetchMethod method, + const char *url, const char *version, + const sockaddr *client_addr, int flags) +{ + init_comm(); + + if (flags & TS_FETCH_FLAGS_NEWLOCK) { + mutex = new_ProxyMutex(); + cont_mutex = cont->mutex; + } else { + mutex = cont->mutex; + } + + contp = cont; + _addr.assign(client_addr); + + // + // Enable stream IO automatically. + // + fetch_flags = (TS_FETCH_FLAGS_STREAM | flags); + + // + // These options are not used when enable + // stream IO. + // + memset(&callback_options, 0, sizeof(callback_options)); + memset(&callback_events, 0, sizeof(callback_events)); + + req_method = method; + req_buffer->write(http_method[method], strlen(http_method[method])); + req_buffer->write(" ", 1); + req_buffer->write(url, strlen(url)); + req_buffer->write(" ", 1); + req_buffer->write(version, strlen(version)); + req_buffer->write("\r\n", 2); +} + +void +FetchSM::ext_add_header(const char *name, int name_len, + const char *value, int value_len) +{ + if (TS_MIME_LEN_CONTENT_LENGTH == name_len && + !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) { + req_content_length = atoll(value); + } + + req_buffer->write(name, name_len); + req_buffer->write(": ", 2); + req_buffer->write(value, value_len); + req_buffer->write("\r\n", 2); +} + +void +FetchSM::ext_lanuch() +{ + req_buffer->write("\r\n", 2); + httpConnect(); +} + +void +FetchSM::ext_write_data(const void *data, size_t len) +{ + if (header_done && (fetch_flags & TS_FETCH_FLAGS_NEWLOCK)) { + MUTEX_TAKE_LOCK(mutex, this_ethread()); + } + + req_buffer->write(data, len); + + // + // Before header_done, FetchSM may not + // be initialized. + // + if (header_done) + write_vio->reenable(); + + if (header_done && (fetch_flags & TS_FETCH_FLAGS_NEWLOCK)) { + MUTEX_UNTAKE_LOCK(mutex, this_ethread()); + } +} + +ssize_t +FetchSM::ext_read_data(char *buf, size_t len) +{ + const char *start; + TSIOBufferReader reader; + TSIOBufferBlock blk, next_blk; + int64_t already, blk_len, need, wavail; + + if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { + MUTEX_TRY_LOCK(lock, mutex, this_ethread()); + if (!lock) + return 0; + } + + if (!header_done) + return 0; + + if (check_chunked()) + reader = NULL; // TODO: asign dechunking reader + else + reader = (TSIOBufferReader)resp_reader; + + already = 0; + blk = TSIOBufferReaderStart(reader); + + while (blk) { + + wavail = len - already; + + next_blk = TSIOBufferBlockNext(blk); + start = TSIOBufferBlockReadStart(blk, reader, &blk_len); + + need = blk_len > wavail ? wavail : blk_len; + + memcpy(&buf[already], start, need); + already += need; + + if (already >= (int64_t)len) + break; + + blk = next_blk; + } + + resp_recived_body_len += already; + resp_reader->consume(already); + + read_vio->reenable(); + return already; +} + +void +FetchSM::ext_destroy() +{ + contp = NULL; + + if (recursion) + return; + + if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) { + MUTEX_TRY_LOCK(lock, mutex, this_ethread()); + if (!lock) { + eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME); + return; + } + } + + cleanUp(); +} + +void +FetchSM::ext_set_user_data(void *data) +{ + user_data = data; +} + +void* +FetchSM::ext_get_user_data() +{ + return user_data; +} + +TSMBuffer +FetchSM::resp_hdr_bufp() +{ + HdrHeapSDKHandle *heap; + heap = (HdrHeapSDKHandle *)&client_response_hdr; + + return (TSMBuffer)heap; +} + +TSMLoc +FetchSM::resp_hdr_mloc() +{ + return (TSMLoc)client_response_hdr.m_http; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/FetchSM.h ---------------------------------------------------------------------- diff --git a/proxy/FetchSM.h b/proxy/FetchSM.h index 3057fc4..1e312ef 100644 --- a/proxy/FetchSM.h +++ b/proxy/FetchSM.h @@ -40,30 +40,56 @@ public: FetchSM() { } - void init(Continuation* cont, TSFetchWakeUpOptions options, TSFetchEvent events, const char* headers, int length, sockaddr const * addr) + void init_comm() { - //_headers.assign(headers); - Debug("FetchSM", "[%s] FetchSM initialized for request with headers\n--\n%.*s\n--", __FUNCTION__, length, headers); + recursion = 0; req_finished = 0; resp_finished = 0; header_done = 0; + user_data = NULL; + has_sent_header = false; + req_method = TS_FETCH_METHOD_NONE; + req_content_length = 0; + resp_is_chunked = -1; + resp_content_length = -1; + resp_recived_body_len = 0; + cont_mutex.clear(); req_buffer = new_MIOBuffer(HTTP_HEADER_BUFFER_SIZE_INDEX); req_reader = req_buffer->alloc_reader(); resp_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K); resp_reader = resp_buffer->alloc_reader(); - response_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K); - response_reader = response_buffer->alloc_reader(); - contp = cont; http_parser_init(&http_parser); client_response_hdr.create(HTTP_TYPE_RESPONSE); client_response = NULL; - mutex = new_ProxyMutex(); + SET_HANDLER(&FetchSM::fetch_handler); + } + + void init(Continuation* cont, TSFetchWakeUpOptions options, + TSFetchEvent events, const char* headers, int length, + sockaddr const *addr) + { + Debug("FetchSM", "[%s] FetchSM initialized for request with headers\n--\n%.*s\n--", + __FUNCTION__, length, headers); + init_comm(); + contp = cont; callback_events = events; callback_options = options; _addr.assign(addr); + fetch_flags = TS_FETCH_FLAGS_NONE; writeRequest(headers,length); - SET_HANDLER(&FetchSM::fetch_handler); + mutex = new_ProxyMutex(); + + // + // We had dropped response_buffer/respone_reader to avoid unnecessary + // memory copying. But for the original TSFetchURL() API, PluginVC may + // stop adding data to resp_buffer when the pending data in resp_buffer + // reach its water_mark. + // + // So we should set the water_mark of resp_buffer with a large value, + // INT64_MAX would be reasonable. + resp_buffer->water_mark = INT64_MAX; } + int fetch_handler(int event, void *data); void process_fetch_read(int event); void process_fetch_write(int event); @@ -72,8 +98,29 @@ public: void get_info_from_buffer(IOBufferReader *reader); char* resp_get(int* length); + TSMBuffer resp_hdr_bufp(); + TSMLoc resp_hdr_mloc(); + + // + // Extended APIs for FetchSM + // + // *flags* can be bitwise OR of several TSFetchFlags + // + void ext_init(Continuation *cont, TSFetchMethod method, + const char *url, const char *version, + const sockaddr *client_addr, int flags); + void ext_add_header(const char *name, int name_len, + const char *value, int value_len); + void ext_lanuch(); + void ext_destroy(); + ssize_t ext_read_data(char *buf, size_t len); + void ext_write_data(const void *data, size_t len); + void ext_set_user_data(void *data); + void* ext_get_user_data(); + private: int InvokePlugin(int event, void*data); + void InvokePluginExt(int error_event = 0); void writeRequest(const char *headers,int length) { @@ -85,11 +132,15 @@ private: int64_t getReqLen() const { return req_reader->read_avail(); } + bool has_body(); + bool check_body_done(); + bool check_chunked(); + int dechunk_body(); + + int recursion; TSVConn http_vc; VIO *read_vio; VIO *write_vio; - MIOBuffer *response_buffer; // response to FetchSM call - IOBufferReader *response_reader; // response to FetchSM call MIOBuffer *req_buffer; IOBufferReader *req_reader; char *client_response; @@ -97,6 +148,7 @@ private: MIOBuffer *resp_buffer; // response to HttpConnect Call IOBufferReader *resp_reader; Continuation *contp; + Ptr<ProxyMutex> cont_mutex; HTTPParser http_parser; HTTPHdr client_response_hdr; TSFetchEvent callback_events; @@ -105,6 +157,14 @@ private: bool header_done; bool resp_finished; IpEndpoint _addr; + int resp_is_chunked; + int fetch_flags; + void *user_data; + bool has_sent_header; + TSFetchMethod req_method; + int64_t req_content_length; + int64_t resp_content_length; + int64_t resp_recived_body_len; }; #endif http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/InkAPI.cc ---------------------------------------------------------------------- diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc index 9719990..a0f1c24 100644 --- a/proxy/InkAPI.cc +++ b/proxy/InkAPI.cc @@ -585,6 +585,15 @@ sdk_sanity_check_continuation(TSCont cont) } TSReturnCode +sdk_sanity_check_fetch_sm(TSFetchSM fetch_sm) +{ + if (fetch_sm == NULL) + return TS_ERROR; + + return TS_SUCCESS; +} + +TSReturnCode sdk_sanity_check_http_ssn(TSHttpSsn ssnp) { if (ssnp == NULL) @@ -7216,6 +7225,96 @@ TSFetchUrl(const char* headers, int request_len, sockaddr const* ip , TSCont con fetch_sm->httpConnect(); } +TSFetchSM +TSFetchCreate(TSCont contp, TSFetchMethod method, + const char *url, const char *version, + struct sockaddr const* client_addr, int flags) +{ + sdk_assert(sdk_sanity_check_continuation(contp) == TS_SUCCESS); + sdk_assert(ats_is_ip4(client_addr)); + + FetchSM *fetch_sm = FetchSMAllocator.alloc(); + + fetch_sm->ext_init((Continuation*)contp, method, url, version, + client_addr, flags); + + return (TSFetchSM)fetch_sm; +} + +void +TSFetchHeaderAdd(TSFetchSM fetch_sm, + const char *name, int name_len, + const char *value, int value_len) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + ((FetchSM*)fetch_sm)->ext_add_header(name, name_len, value, value_len); +} + +void +TSFetchWriteData(TSFetchSM fetch_sm, const void *data, size_t len) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + ((FetchSM*)fetch_sm)->ext_write_data(data, len); +} + +ssize_t +TSFetchReadData(TSFetchSM fetch_sm, void *buf, size_t len) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + return ((FetchSM*)fetch_sm)->ext_read_data((char *)buf, len); +} + +void +TSFetchLaunch(TSFetchSM fetch_sm) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + ((FetchSM*)fetch_sm)->ext_lanuch(); +} + +void +TSFetchDestroy(TSFetchSM fetch_sm) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + ((FetchSM*)fetch_sm)->ext_destroy(); +} + +void +TSFetchUserDataSet(TSFetchSM fetch_sm, void *data) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + ((FetchSM*)fetch_sm)->ext_set_user_data(data); +} + +void* +TSFetchUserDataGet(TSFetchSM fetch_sm) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + return ((FetchSM*)fetch_sm)->ext_get_user_data(); +} + +TSMBuffer +TSFetchRespHdrMBufGet(TSFetchSM fetch_sm) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + return ((FetchSM*)fetch_sm)->resp_hdr_bufp(); +} + +TSMLoc +TSFetchRespHdrMLocGet(TSFetchSM fetch_sm) +{ + sdk_assert(sdk_sanity_check_fetch_sm(fetch_sm) == TS_SUCCESS); + + return ((FetchSM*)fetch_sm)->resp_hdr_mloc(); +} + TSReturnCode TSHttpIsInternalRequest(TSHttpTxn txnp) { http://git-wip-us.apache.org/repos/asf/trafficserver/blob/45f65532/proxy/api/ts/experimental.h ---------------------------------------------------------------------- diff --git a/proxy/api/ts/experimental.h b/proxy/api/ts/experimental.h index d926fd8..8621158 100644 --- a/proxy/api/ts/experimental.h +++ b/proxy/api/ts/experimental.h @@ -37,6 +37,37 @@ extern "C" { #endif /* __cplusplus */ + /* For Extended FetchSM APIs */ + typedef enum { + TS_FETCH_METHOD_NONE, + TS_FETCH_METHOD_GET, + TS_FETCH_METHOD_POST, + TS_FETCH_METHOD_CONNECT, + TS_FETCH_METHOD_DELETE, + TS_FETCH_METHOD_HEAD, + TS_FETCH_METHOD_PURGE, + TS_FETCH_METHOD_PUT, + TS_FETCH_METHOD_LAST + } TSFetchMethod; + + typedef enum + { + TS_FETCH_EVENT_EXT_HEAD_READY = -1, + TS_FETCH_EVENT_EXT_HEAD_DONE = -2, + TS_FETCH_EVENT_EXT_BODY_READY = -3, + TS_FETCH_EVENT_EXT_BODY_DONE = -4, + } TSFetchEventExt; + + typedef enum + { + TS_FETCH_FLAGS_NONE = 0, // do nothing + TS_FETCH_FLAGS_STREAM = 1 << 1, // enable stream IO + TS_FETCH_FLAGS_DECHUNK = 1 << 2, // dechunk body content + TS_FETCH_FLAGS_NEWLOCK = 1 << 3, // allocate new lock for fetch sm + } TSFetchFlags; + + typedef struct tsapi_fetchsm* TSFetchSM; + /* Forward declaration of in_addr, any user of these APIs should probably include net/netinet.h or whatever is appropriate on the platform. */ struct in_addr; @@ -600,6 +631,94 @@ extern "C" return value 0 indicates success */ tsapi int TSPrefetchHookSet(int hook_no, TSPrefetchHook hook_fn); + + /** + * Extended FetchSM's AIPs + */ + + /* + * Create FetchSM, this API will enable stream IO automatically. + * + * @param contp: continuation to be callbacked. + * @param method: request method. + * @param url: scheme://host[:port]/path. + * @param version: client http version, eg: "HTTP/1.1". + * @param client_addr: client addr sent to log. + * @param flags: can be bitwise OR of several TSFetchFlags. + * + * return TSFetchSM which should be destroyed by TSFetchDestroy(). + */ + tsapi TSFetchSM TSFetchCreate(TSCont contp, TSFetchMethod method, + const char *url, const char *version, + struct sockaddr const* client_addr, int flags); + + /* + * Create FetchSM, this API will enable stream IO automatically. + * + * @param fetch_sm: returned value of TSFetchCreate(). + * @param name: name of header. + * @param name_len: len of name. + * @param value: value of header. + * @param name_len: len of value. + * + * return TSFetchSM which should be destroyed by TSFetchDestroy(). + */ + tsapi void TSFetchHeaderAdd(TSFetchSM fetch_sm, + const char *name, int name_len, + const char *value, int value_len); + + /* + * Write data to FetchSM + * + * @param fetch_sm: returned value of TSFetchCreate(). + * @param data/len: data to be written to fetch sm. + */ + tsapi void TSFetchWriteData(TSFetchSM fetch_sm, const void *data, size_t len); + + /* + * Read up to *len* bytes from FetchSM into *buf*. + * + * @param fetch_sm: returned value of TSFetchCreate(). + * @param buf/len: buffer to contain data from fetch sm. + */ + tsapi ssize_t TSFetchReadData(TSFetchSM fetch_sm, void *buf, size_t len); + + /* + * Lanuch FetchSM to do http request, before calling this API, + * you should append http request header into fetch sm through + * TSFetchWriteData() API + * + * @param fetch_sm: comes from returned value of TSFetchCreate(). + */ + tsapi void TSFetchLaunch(TSFetchSM fetch_sm); + + /* + * Destroy FetchSM + * + * @param fetch_sm: returned value of TSFetchCreate(). + */ + tsapi void TSFetchDestroy(TSFetchSM fetch_sm); + + /* + * Set user-defined data in FetchSM + */ + tsapi void TSFetchUserDataSet(TSFetchSM fetch_sm, void *data); + + /* + * Get user-defined data in FetchSM + */ + tsapi void* TSFetchUserDataGet(TSFetchSM fetch_sm); + + /* + * Get client response hdr mbuffer + */ + tsapi TSMBuffer TSFetchRespHdrMBufGet(TSFetchSM fetch_sm); + + /* + * Get client response hdr mloc + */ + tsapi TSMLoc TSFetchRespHdrMLocGet(TSFetchSM fetch_sm); + #ifdef __cplusplus } #endif /* __cplusplus */