This is an automated email from the ASF dual-hosted git repository. gancho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new 18e67bd Prefetch plugin 18e67bd is described below commit 18e67bd9e9790543e2872741d48b61571ef5408c Author: Gancho Tenev <gan...@apache.org> AuthorDate: Sat Jun 30 00:48:24 2018 -0700 Prefetch plugin The purpose of the plugin is to increase the cache-hit ratio for a sequence of objects which URL paths follow a common pattern. --- doc/admin-guide/plugins/index.en.rst | 1 + doc/admin-guide/plugins/prefetch.en.rst | 278 ++++++++ .../images/admin/prefetch_plugin_deployment.png | Bin 0 -> 222757 bytes plugins/Makefile.am | 1 + plugins/experimental/prefetch/Makefile.inc | 28 + plugins/experimental/prefetch/README.md | 8 + plugins/experimental/prefetch/common.cc | 59 ++ plugins/experimental/prefetch/common.h | 68 ++ plugins/experimental/prefetch/configs.cc | 172 +++++ plugins/experimental/prefetch/configs.h | 202 ++++++ plugins/experimental/prefetch/fetch.cc | 739 ++++++++++++++++++++ plugins/experimental/prefetch/fetch.h | 202 ++++++ plugins/experimental/prefetch/fetch_policy.cc | 57 ++ plugins/experimental/prefetch/fetch_policy.h | 66 ++ plugins/experimental/prefetch/fetch_policy_lru.cc | 141 ++++ plugins/experimental/prefetch/fetch_policy_lru.h | 105 +++ .../experimental/prefetch/fetch_policy_simple.cc | 80 +++ .../experimental/prefetch/fetch_policy_simple.h | 46 ++ plugins/experimental/prefetch/headers.cc | 213 ++++++ plugins/experimental/prefetch/headers.h | 31 + plugins/experimental/prefetch/pattern.cc | 463 +++++++++++++ plugins/experimental/prefetch/pattern.h | 92 +++ plugins/experimental/prefetch/plugin.cc | 751 +++++++++++++++++++++ 23 files changed, 3803 insertions(+) diff --git a/doc/admin-guide/plugins/index.en.rst b/doc/admin-guide/plugins/index.en.rst index 566eecf..2a37ea6 100644 --- a/doc/admin-guide/plugins/index.en.rst +++ b/doc/admin-guide/plugins/index.en.rst @@ -161,6 +161,7 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi Stale While Revalidate <stale_while_revalidate.en> System Statistics <system_stats.en> WebP Transform <webp_transform.en> + Prefetch <prefetch.en> :doc:`Access Control <access_control.en>` Access control plugin that handles various access control use-cases. diff --git a/doc/admin-guide/plugins/prefetch.en.rst b/doc/admin-guide/plugins/prefetch.en.rst new file mode 100644 index 0000000..93a7e3c --- /dev/null +++ b/doc/admin-guide/plugins/prefetch.en.rst @@ -0,0 +1,278 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. include:: ../../common.defs + +.. _admin-plugins-prefetch: + + +Prefetch Plugin +*************** + +Description +=========== + +The purpose of the plugin is to increase the cache-hit ratio for a sequence of +objects which URL paths follow a common pattern. + +On every **incoming** URL request, the plugin can decide to pre-fetch the +**next object** or more objects based on the common URL path pattern and a +pre-defined pre-fetch policy. + +Currently, most HLS video urls follow a predictable pattern, with most URLs +containing a segment number. Since the segments are ~10s of content, the normal +usage pattern is to fetch the incremental segment every few seconds. The CDN +has its best chance of delivering a good user experience if the requests are +served from cache. Since we can predict the **next object** fetched, we should be +able to dramatically increase the chance of it being a cache hit. + +This is primarily useful for: + +* less popular content. Popular movies' segments are constantly being refreshed + in cache by user requests. Less popular content is less likely to be in cache. +* device failures. There can be a significant time gap between a seeding request + and the user request. During this time, devices can fail, which cause cache + misses. The time gap between the plugin's request and the user's request can be + used to smooth over the failures. + + +Why do this? Isn't seeding sufficient? +-------------------------------------- + +In practice the cache hit rate for the user facing HLS video content is never perfect 100%. +This plugin should increase the cache hit rate. + +* The caches will eventually wrap. The unpopular content/tiers will be evicted, + and not repopulated. +* Disks fail. Content on these disk will become cache misses at least at that layer. +* Machines fail. + * Content seeded on those machines will become misses + * Content seeded while a machine is down will be seeded to the "wrong" machine +* Bugs. + * The content is usually managed by another organization which could have + * issues determining all the content (especially international) + * issues getting every asset on the storage service (i.e. AWS S3) + * issues sending us the assets in time + * ATS has had 5xx errors preventing seeding of assets +* The process of seeding sometimes wastes significantly more resources then the normal usage. + + +How well it works +----------------- + +The Prefetch plugin was initially designed to assist the seeding performed by a separate subsystem / process which suffered the problems mentioned above. + +The initial Prefetch plugin deployment graph below shows the per-POP cache-hit-ratio before and after its full deployment. +It is worth mentioning that a small percentage of the requests did not follow a predictable pattern and were not handled by the plugin. + +.. figure:: ../../static/images/admin/prefetch_plugin_deployment.png + :align: center + :alt: prefetch plugin initial deployment + + Prefetch plugin initial deployment. + +* All POPs were seeded periodically except for POP #1 and the plugin was deployed in the following order: POP #0, #1, #2, #3 and then to the rest at once. +* POP #0 was the first plugin deployment and was used to tune its configuration for better results. +* POP #1 was a "testing ground" for the “worst case” (no seeding at all, imperfect conditions like low traffic and poorer connectivity to origin) and relying only on the Prefetch plugin. +* POP #2 and POP #3 experienced seeding problems (at times it reached ~60%, not shown here). + + +How does it work? +----------------- + +The primary use-case for the plugin is to work in a multi-tier (child-parent) +environment where a consistent hashing of the URI is used to choose the next +tier parent but a single-tier use case is also supported (should work w/o +any code changes). + +When a request comes to the child (only), the url is checked in an LRU. If the +object exists in the LRU, we assume that we've pre-fetched the following object +recently, and thus do not need to take any further action. If, however, the +object is **not** found, we proceed with prefetching. + +The plugin calculates the URI of the next segment, ATS performs the consistent +hash calculation on it to find the appropriate parent, and sends that parent +a request for it, including a special header. When the parent receives the +request, it will either find it in cache or begin the fetch from its next tier. +Since the request from the child has the special header, the parent will only +send the headers of the object back to the client, saving network and processing +bytes. The child thus does not cache the pre-fetched object which is ok since +the user may not hit that same child for the subsquent object. + +Then, when the user makes their next request for the pre-fetched object, the +child that handles the request will perform the consistent-hash, find the +same parent that got the pre-fetch request, and be served from its cache. + +Usage +----- + +* Dual-tiered usage - the plugin runs in 2 modes (2 instances) + * the **front-tier** instance decides if the "next" object needs prefetching + based on the pre-fetch policy and only sends a signal to the **back-tier** + * the **back-tier** instance responds quickly w/o returning any objects to + the **front-tier** and actually performs the background fetch. +* Single-tier usage - the plugin runs on the first user facing tier. + + +How the "next" object path is calculated +---------------------------------------- + +* The cache key of an incoming URL is checked against the fetching policy defined by ``--fetch-policy``. +* If the **next object** is to be pre-fetched the ``--fetch-path-pattern=/regex/capture/`` is used to transform the **incoming** URL path into the **next** +* The number of prefetched objects is specified by ``—fetch-count`` +* The hostname of the prefetch request can be replaced by using ``--replace-host`` + +Let say we have the following setup :: + + map http://example.com http://origin.com \ + @plugin=cachekey.so @pparam=--remove-all-params=true \ + @plugin=prefetch.so \ + @pparam=--fetch-policy=simple \ + @pparam=--fetch-path-pattern=/(.*-)(\d+)(.*)/$1{$2+2}$3/ \ + @pparam=--fetch-count=3 \ + @pparam=--replace-host=example-seed.com + + +If the "incoming" URL is :: + + http://example.com/path/file-104.mov?a=a&b=b + + +the the following URLs will be requested to be prefetched :: + + http://example-seed.com/path/file-106.mov?a=a&b=b + http://example-seed.com/path/file-108.mov?a=a&b=b + http://example-seed.com/path/file-110.mov?a=a&b=b + + +Note ``--fetch-path-pattern`` is a PCRE regex/capture pattern and +``{$2+2}`` is a mechanism to calculate the next path by adding or +subtracting integer numbers. + + +Overhead from **next object** prefetch +-------------------------------------- + +Consuming extra resources +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The plugin uses more CDN resources to improve the user experience. The plugin +attempts to minimize the extraneous resources used. + +* The prefetch policy (LRU) attempts to minimize the URLs fetched. The popular video segments + (which represent the majority of the requests) will quickly populate the LRU, + preventing their pre-fetching. +* If the original request is for the last segment in the video, the plugin will + make our system have a frivolous request to origin for the next non-existent + segment. +* If the user stops watching the video, the plugin may (if not popular) make a + request for a single segment that goes un-requested. + +Minimizing **next object** prefetch overhead +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The current implementation relies on the following assumptions and egnineering +compromises: + +* **First match the next object pattern** defined by ``--fetch-path-pattern`` + plugin parameter, not matching requests are ignored (prefetch is never triggered) +* **Define a prefetch policy** which tries to suppress uneccessary **next object** + prefetches for the most recently used requests which are assumed should be already + in cache. Currently only ``lru:n`` policy is supported, it is using an URI-hash LRU + cache which evicts the least recently used elements first. Every request's **cache key** + is checked against it and if found the **next object** prefetch is skipped/cancelled. + (**side note**: the ``lru:n`` **is not** the same as cache_promote plugin `lru`, + the latter is rather a frequency divider for the most recently used URIs). +* **Always use request's cache key** instead of request's URI and also **remove + the query parameters** from the key which guarantees that multiple different + requests which would result in using the same **cache key** are not considered as + separate requests (which could bloat/dilute the LRU cache + if not normalized) +* **Check if the the fetch request is unique**. A ``simple`` prefetching policy is + always used to make sure prefetches for the same object (same cache key) are + never triggered simultaneously. +* **Check if already cached**. Before triggering the prefetch request to the + next tier it is always checked if the **incoming** object is already cached, + assuming that if already cached there is a good change the **next + object** would be cached as well. +* **Don't fetch the response body** and **never cache** at the **front-tier**. + The **front-tier** marks the prefetch request with a special API header defined + by ``--api-header`` plugin parameter. When recieved the **back-tier** responds + right away before actually fetching the object (without a body), it just + schedules the real prefetch at the **back-tier**. ``Cache-Control: no-store`` + is used to make sure the prefetch request response is never cached at the **front-tier**. + In such a way resources are saved (time, memory, CPU, bandwidth, etc) and also + unnecessary caching at the **front-tier** is avoided (where currently cache_promote + plugin is already being used to alleviate the load on the disks). +* **Throttle the prefetch activity** - if necessary a limit can by imposed on the + number of concurrent prefetch requests by using ``--fetch-max`` plugin parameter. + +Plugin parameters +================= + +* ``--front`` + - ``true`` - configures the plugin run on the **front-tier**, + - ``false`` - to be run on the **back-tier**. +* ``--api-header`` - the header used by the plugin internally, also used to mark a prefetch request to the next tier in dual-tier usage. +* ``--fetch-policy`` - fetch policy + - ``simple`` - this policy just makes sure there are no same concurrent prefetches triggered (default and always used in combination with any other policy) + - ``lru:n`` - this policy uses LRU to identify “hot” objects and triggers prefetch if the object is not found. `n` is the size of the LRU +* ``--fetch-count`` - how many objects to be prefetched. +* ``--fetch-path-pattern`` - regex/capture pattern that would transform the **incoming** into the **next object** path. +* ``--fetch-max`` - maximum concurrent fetches allowed, this would allow to throttle the prefetch activity if necessary +* ``--replace-host`` - allows the prefetch requests to be forwarded to a different host or remap rule (replaces the host in the prefetch request) +* ``--name-space`` - by default all plugin instances used for all remap use a single background fetch state, this parameter allows to specify a separate state per remap rule of per group of remap rules. +* ``--metrics-prefix`` - prefix for the metrics generated by the plugin. +* ``--exact-match`` + * if ``false`` (default) the fetch policy would use the **incoming** URL's cache key to find out if the **next object** should be prefetched or not, + * if ``true`` the fetch policy would use the **next** URL's cache key that to find out if the **next object** should be prefetched or not +* ``--log-name`` - specifies a custom log name (if not specified a log is not created) + +Metrics +======= + +The plugin maintains the following metrics: + +* Prefetch request status related + * ``fetch.active`` - number of currently active prefetch requests (counter) + * ``fetch.completed``- number of succesfully completed prefetch requests (counter) + * ``fetch.errors`` - number of failed prefetch requests (counter) + * ``fetch.timeouts`` - number of timed-out prefetch requests (counter) + * ``fetch.throttled`` - number of throttled prefetch requests (counter), throttle limit defined by ``--fetch-max`` + * ``fetch.total``- total number of prefetch requests (counter). +* Fetch policy related: + * all **incoming** request URIs are first matched against the next object pattern defined in ``--fetch-path-pattern`` + * ``fetch.match.yes`` - number of requests matched the pattern (counter), eligible for triggering prefetch request + * ``fetch.match.no`` - number of requests not matching the pattern (counter), ignored by the plugin, will never trigger prefetch request + * prefetch policy related (i.e. ``--fetch-policy=lru:n``) + * ``fetch.policy.yes`` - number of times (counter) the policy allowed scheduling of the prefetch request (for ``lru:n`` policy cachekey **was not** found in the LRU) + * ``fetch.policy.no`` - number of times (counter) the policy disallowed scheduling of the prefetch request (for ``lru:n`` policy cachekey **was** found in the LRU) + * ``fetch.policy.maxsize`` - size of the prefetch policy (gauge, i.e for ``lru:n`` policy the max size is ``n``) + * ``fetch.policy.size`` - current size of the prefetch policy (gauge, i.e for ``lru:n`` policy the max size is a number <= ``n``) + * before sending any new prefetch request plugin makes sure the object is not currently being prefetched (unique). + * ``fetch.unique.yes`` - number of unique requests (counter), for which there are no current prefetch requests for the same object (cache key is used for this check). + * ``fetch.unique.no`` - number of not unique request (counter), for which there is currently prefetch running for the same object (cache key is used for this check). + * before sending any new prefetch request plugin makes sure the object is not already cached. + * ``fetch.already_cached`` - number of prefetch requests not sent (cancelled) because the object was already in cache (likely no prefetch needed) + +The exact metric name is defined by the following plugin parameters: + +* ``—metrics-prefix=<sample-prefix>`` +* ``—name-space=<sample-name-space>`` + +For instance the final ``fetch.active`` metric will be called ``<sample-prefix>.<sample-name-space>.fetch.active`` diff --git a/doc/static/images/admin/prefetch_plugin_deployment.png b/doc/static/images/admin/prefetch_plugin_deployment.png new file mode 100644 index 0000000..4b218de Binary files /dev/null and b/doc/static/images/admin/prefetch_plugin_deployment.png differ diff --git a/plugins/Makefile.am b/plugins/Makefile.am index b955a71..6115b21 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -83,6 +83,7 @@ include experimental/system_stats/Makefile.inc include experimental/traffic_dump/Makefile.inc include experimental/tls_bridge/Makefile.inc include experimental/url_sig/Makefile.inc +include experimental/prefetch/Makefile.inc if BUILD_URI_SIGNING_PLUGIN include experimental/uri_signing/Makefile.inc diff --git a/plugins/experimental/prefetch/Makefile.inc b/plugins/experimental/prefetch/Makefile.inc new file mode 100644 index 0000000..566ba82 --- /dev/null +++ b/plugins/experimental/prefetch/Makefile.inc @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pkglib_LTLIBRARIES += experimental/prefetch/prefetch.la +experimental_prefetch_prefetch_la_SOURCES = \ + experimental/prefetch/plugin.cc \ + experimental/prefetch/common.cc \ + experimental/prefetch/configs.cc \ + experimental/prefetch/fetch.cc \ + experimental/prefetch/headers.cc \ + experimental/prefetch/pattern.cc \ + experimental/prefetch/fetch_policy.cc \ + experimental/prefetch/fetch_policy_simple.cc \ + experimental/prefetch/fetch_policy_lru.cc + diff --git a/plugins/experimental/prefetch/README.md b/plugins/experimental/prefetch/README.md new file mode 100644 index 0000000..86e9b6a --- /dev/null +++ b/plugins/experimental/prefetch/README.md @@ -0,0 +1,8 @@ +# Description + +The purpose of the plugin is to increase the cache-hit ratio for a sequence of +objects which URL paths follow a common pattern. + +# Documentation +Details and examples can be found in [prefetch plugin documentation](../../doc/admin-guide/plugins/prefetch.en.rst). + diff --git a/plugins/experimental/prefetch/common.cc b/plugins/experimental/prefetch/common.cc new file mode 100644 index 0000000..fa2c9cf --- /dev/null +++ b/plugins/experimental/prefetch/common.cc @@ -0,0 +1,59 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file common.cc + * @brief Common declarations and definitions. + * @see common.h + */ + +#include <string.h> +#include <stdlib.h> + +#include "common.h" + +#ifdef PREFETCH_UNIT_TEST + +void +PrintToStdErr(const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + vfprintf(stderr, fmt, args); + va_end(args); +} + +#endif /* PREFETCH_UNIT_TEST */ + +size_t +getValue(const String &str) +{ + char buffer[str.length() + 1]; + strncpy(buffer, str.c_str(), str.length()); + buffer[str.length()] = 0; + return static_cast<size_t>(strtoul(buffer, nullptr, 10)); +} + +size_t +getValue(const char *str, size_t len) +{ + char buffer[len + 1]; + strncpy(buffer, str, len); + buffer[len] = 0; + return static_cast<size_t>(strtoul(buffer, nullptr, 10)); +} diff --git a/plugins/experimental/prefetch/common.h b/plugins/experimental/prefetch/common.h new file mode 100644 index 0000000..35bbab0 --- /dev/null +++ b/plugins/experimental/prefetch/common.h @@ -0,0 +1,68 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file common.h + * @brief Common declarations and definitions (header file). + */ + +#pragma once + +#define PLUGIN_NAME "prefetch" + +#include <list> +#include <set> +#include <string> +#include <vector> + +typedef std::string String; +typedef std::set<std::string> StringSet; +typedef std::list<std::string> StringList; +typedef std::vector<std::string> StringVector; + +#ifdef PREFETCH_UNIT_TEST +#include <assert.h> +#include <stdarg.h> +#include <stdio.h> + +#define PrefetchDebug(fmt, ...) PrintToStdErr("(%s) %s:%d:%s() " fmt "\n", PLUGIN_NAME, __FILE__, __LINE__, __func__, ##__VA_ARGS__) +#define PrefetchError(fmt, ...) PrintToStdErr("(%s) %s:%d:%s() " fmt "\n", PLUGIN_NAME, __FILE__, __LINE__, __func__, ##__VA_ARGS__) +void PrintToStdErr(const char *fmt, ...); +#define PrefetchAssert assert + +#else /* PREFETCH_UNIT_TEST */ + +#include "ts/ts.h" + +#define PrefetchDebug(fmt, ...) \ + do { \ + TSDebug(PLUGIN_NAME, "%s:%d:%s() " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__); \ + } while (0) + +#define PrefetchError(fmt, ...) \ + do { \ + TSError("(%s) " fmt, PLUGIN_NAME, ##__VA_ARGS__); \ + TSDebug(PLUGIN_NAME, "%s:%d:%s() " fmt, __FILE__, __LINE__, __func__, ##__VA_ARGS__); \ + } while (0) + +#define PrefetchAssert TSAssert + +#endif /* PREFETCH_UNIT_TEST */ + +size_t getValue(const String &str); +size_t getValue(const char *str, size_t len); diff --git a/plugins/experimental/prefetch/configs.cc b/plugins/experimental/prefetch/configs.cc new file mode 100644 index 0000000..721acd3 --- /dev/null +++ b/plugins/experimental/prefetch/configs.cc @@ -0,0 +1,172 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file configs.cc + * @brief Plugin configuration. + */ + +#include <fstream> /* std::ifstream */ +#include <getopt.h> /* getopt_long() */ +#include <sstream> /* std::istringstream */ +#include <strings.h> /* strncasecmp() */ + +#include "configs.h" + +template <typename ContainerType> +static void +commaSeparateString(ContainerType &c, const String &input) +{ + std::istringstream istr(input); + String token; + + while (std::getline(istr, token, ',')) { + c.insert(c.end(), token); + } +} + +static bool +isTrue(const char *arg) +{ + return (0 == strncasecmp("true", arg, 4) || 0 == strncasecmp("1", arg, 1) || 0 == strncasecmp("yes", arg, 3)); +} + +/** + * @brief initializes plugin configuration. + * @param argc number of plugin parameters + * @param argv plugin parameters + */ +bool +PrefetchConfig::init(int argc, char *argv[]) +{ + static const struct option longopt[] = {{const_cast<char *>("front"), optional_argument, 0, 'f'}, + {const_cast<char *>("api-header"), optional_argument, 0, 'h'}, + {const_cast<char *>("next-header"), optional_argument, 0, 'n'}, + {const_cast<char *>("fetch-policy"), optional_argument, 0, 'p'}, + {const_cast<char *>("fetch-count"), optional_argument, 0, 'c'}, + {const_cast<char *>("fetch-path-pattern"), optional_argument, 0, 'e'}, + {const_cast<char *>("fetch-max"), optional_argument, 0, 'x'}, + {const_cast<char *>("replace-host"), optional_argument, 0, 'r'}, + {const_cast<char *>("name-space"), optional_argument, 0, 's'}, + {const_cast<char *>("metrics-prefix"), optional_argument, 0, 'm'}, + {const_cast<char *>("exact-match"), optional_argument, 0, 'y'}, + {const_cast<char *>("log-name"), optional_argument, 0, 'l'}, + {0, 0, 0, 0}}; + + bool status = true; + optind = 0; + + /* argv contains the "to" and "from" URLs. Skip the first so that the second one poses as the program name. */ + argc--; + argv++; + + for (;;) { + int opt; + opt = getopt_long(argc, (char *const *)argv, "", longopt, nullptr); + + if (opt == -1) { + break; + } + + PrefetchDebug("processing %s", argv[optind - 1]); + + switch (opt) { + case 'f': /* --front */ + _front = ::isTrue(optarg); + break; + + case 'h': /* --api-header */ + setApiHeader(optarg); + break; + + case 'n': /* --next-header */ + setNextHeader(optarg); + break; + + case 'p': /* --fetch-policy */ + setFetchPolicy(optarg); + break; + + case 'c': /* --fetch-count */ + setFetchCount(optarg); + break; + + case 'e': /* --fetch-path-pattern */ { + Pattern *pattern = new Pattern(); + if (nullptr != pattern) { + if (pattern->init(optarg)) { + _nextPaths.add(pattern); + } else { + PrefetchError("failed to initialize next object pattern"); + delete pattern; + } + } + } break; + + case 'x': /* --fetch-max */ + setFetchMax(optarg); + break; + + case 'r': /* --replace-host */ + setReplaceHost(optarg); + break; + + case 's': /* --name-space */ + setNameSpace(optarg); + break; + + case 'm': /* --metrics-prefix */ + setMetricsPrefix(optarg); + break; + + case 'y': /* --exact-match */ + _exactMatch = ::isTrue(optarg); + break; + + case 'l': /* --log-name */ + setLogName(optarg); + break; + } + } + + status &= finalize(); + + return status; +} + +/** + * @brief provides means for post-processing of the plugin parameters to finalize the configuration or to "cache" some of the + * decisions for later use. + * @return true if successful, false if failure. + */ +bool +PrefetchConfig::finalize() +{ + PrefetchDebug("front-end: %s", (_front ? "true" : "false")); + PrefetchDebug("exact match: %s", (_exactMatch ? "true" : "false")); + PrefetchDebug("API header name: %s", _apiHeader.c_str()); + PrefetchDebug("next object header name: %s", _nextHeader.c_str()); + PrefetchDebug("fetch policy parameters: %s", _fetchPolicy.c_str()); + PrefetchDebug("fetch count: %d", _fetchCount); + PrefetchDebug("fetch concurrently max: %d", _fetchMax); + PrefetchDebug("replace host name: %s", _replaceHost.c_str()); + PrefetchDebug("name space: %s", _namespace.c_str()); + PrefetchDebug("log name: %s", _logName.c_str()); + + return true; +} diff --git a/plugins/experimental/prefetch/configs.h b/plugins/experimental/prefetch/configs.h new file mode 100644 index 0000000..2552c36 --- /dev/null +++ b/plugins/experimental/prefetch/configs.h @@ -0,0 +1,202 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file configs.h + * @brief Plugin configuration (header file). + */ + +#pragma once + +#include <string> + +#include "common.h" +#include "pattern.h" + +/** + * @brief Prefetch configuration instance + */ +class PrefetchConfig +{ +public: + PrefetchConfig() + : _apiHeader("X-AppleCDN-Prefetch"), + _nextHeader("X-AppleCDN-Prefetch-Next"), + _replaceHost(), + _namespace("default"), + _metricsPrefix("prefetch.stats"), + _fetchCount(1), + _fetchMax(0), + _front(false), + _exactMatch(false) + { + } + + /** + * @brief initializes plugin configuration. + * @param argc number of plugin parameters + * @param argv plugin parameters + */ + bool init(int argc, char *argv[]); + + void + setApiHeader(const char *optarg) + { + _apiHeader.assign(optarg); + } + + const std::string & + getApiHeader() const + { + return _apiHeader; + } + + void + setNextHeader(const char *optarg) + { + _nextHeader.assign(optarg); + } + + const std::string & + getNextHeader() const + { + return _nextHeader; + } + + void + setFetchPolicy(const char *optarg) + { + _fetchPolicy.assign(optarg); + } + + const std::string & + getFetchPolicy() const + { + return _fetchPolicy; + } + + void + setReplaceHost(const char *optarg) + { + _replaceHost.assign(optarg); + } + + const std::string & + getReplaceHost() const + { + return _replaceHost; + } + + bool + isFront() const + { + return _front; + } + + bool + isExactMatch() const + { + return _exactMatch; + } + + void + setFetchCount(const char *optarg) + { + _fetchCount = getValue(optarg); + } + + unsigned + getFetchCount() const + { + return _fetchCount; + } + + void + setFetchMax(const char *optarg) + { + _fetchMax = getValue(optarg); + } + + unsigned + getFetchMax() const + { + return _fetchMax; + } + + void + setNameSpace(const char *optarg) + { + _namespace.assign(optarg); + } + + const String & + getNameSpace() const + { + return _namespace; + } + + void + setMetricsPrefix(const char *optarg) + { + _metricsPrefix.assign(optarg); + } + + const String & + getMetricsPrefix() const + { + return _metricsPrefix; + } + + MultiPattern & + getNextPath() + { + return _nextPaths; + } + + void + setLogName(const char *optarg) + { + _logName.assign(optarg); + } + + const String & + getLogName() const + { + return _logName; + } + + /** + * @brief provides means for post-processing of the plugin parameters to finalize the configuration. + * @return true if successful, false if failure. + */ + bool finalize(); + +private: + std::string _apiHeader; + std::string _nextHeader; + std::string _fetchPolicy; + std::string _replaceHost; + std::string _namespace; + std::string _metricsPrefix; + std::string _logName; + unsigned _fetchCount; + unsigned _fetchMax; + bool _front; + bool _exactMatch; + MultiPattern _nextPaths; +}; diff --git a/plugins/experimental/prefetch/fetch.cc b/plugins/experimental/prefetch/fetch.cc new file mode 100644 index 0000000..ca7daf0 --- /dev/null +++ b/plugins/experimental/prefetch/fetch.cc @@ -0,0 +1,739 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file bg_fetch.cpp + * @brief Background fetch related classes (header file). + */ + +#include <arpa/inet.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <netinet/ip.h> +#include <string.h> +#include <sys/socket.h> +#include <inttypes.h> + +#include "ts/ts.h" /* ATS API */ +#include "fetch.h" +#include "headers.h" + +const char * +getPrefetchMetricsNames(int metric) +{ + switch (metric) { + case FETCH_ACTIVE: + return "fetch.active"; + break; + case FETCH_COMPLETED: + return "fetch.completed"; + break; + case FETCH_ERRORS: + return "fetch.errors"; + break; + case FETCH_TIMEOOUTS: + return "fetch.timeouts"; + break; + case FETCH_THROTTLED: + return "fetch.throttled"; + break; + case FETCH_ALREADY_CACHED: + return "fetch.already_cached"; + break; + case FETCH_TOTAL: + return "fetch.total"; + break; + case FETCH_UNIQUE_YES: + return "fetch.unique.yes"; + break; + case FETCH_UNIQUE_NO: + return "fetch.unique.no"; + break; + case FETCH_MATCH_YES: + return "fetch.match.yes"; + break; + case FETCH_MATCH_NO: + return "fetch.match.no"; + break; + case FETCH_POLICY_YES: + return "fetch.policy.yes"; + break; + case FETCH_POLICY_NO: + return "fetch.policy.no"; + break; + case FETCH_POLICY_SIZE: + return "fetch.policy.size"; + break; + case FETCH_POLICY_MAXSIZE: + return "fetch.policy.maxsize"; + break; + default: + return "unknown"; + break; + } +} + +static bool +createStat(const String &prefix, const String &space, const char *module, const char *statName, TSRecordDataType statType, + int &statId) +{ + String name(prefix); + name.append(".").append(space); + if (nullptr != module) { + name.append(".").append(module); + } + name.append(".").append(statName); + + if (TSStatFindName(name.c_str(), &statId) == TS_ERROR) { + statId = TSStatCreate(name.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + if (statId == TS_ERROR) { + PrefetchError("failed to register '%s'", name.c_str()); + return false; + } + + TSStatIntSet(statId, 0); + } + + PrefetchDebug("created metric '%s (id:%d)'", name.c_str(), statId); + + return true; +} + +BgFetchState::BgFetchState() : _policy(nullptr), _unique(nullptr), _concurrentFetches(0), _concurrentFetchesMax(0), _log(nullptr) +{ + _policyLock = TSMutexCreate(); + if (nullptr == _policyLock) { + PrefetchError("failed to initialize lock"); + } else { + PrefetchDebug("initialized lock"); + } + + _lock = TSMutexCreate(); + if (nullptr == _lock) { + PrefetchError("failed to initialize lock"); + } else { + PrefetchDebug("initialized lock"); + } +} + +BgFetchState::~BgFetchState() +{ + TSMutexLock(_policyLock); + delete _policy; + TSMutexUnlock(_policyLock); + + TSMutexLock(_lock); + delete _unique; + TSMutexUnlock(_lock); + + TSMutexDestroy(_policyLock); + TSMutexDestroy(_lock); + + TSTextLogObjectFlush(_log); + TSTextLogObjectDestroy(_log); +} + +static bool +initializePolicy(FetchPolicy *&policy, const char *policyName) +{ + bool status = true; + if (nullptr == policy) { + policy = FetchPolicy::getInstance(policyName); + if (nullptr == policy) { + PrefetchError("failed to initialize the %s policy", policyName); + status = false; + } + } else { + PrefetchDebug("state already initialized"); + } + return status; +} + +bool +initializeMetrics(PrefetchMetricInfo metrics[], const PrefetchConfig &config) +{ + bool status = true; + for (int i = FETCH_ACTIVE; i < FETCHES_MAX_METRICS; i++) { + if (-1 == metrics[i].id) { + status = createStat(config.getMetricsPrefix(), config.getNameSpace(), nullptr, getPrefetchMetricsNames(i), metrics[i].type, + metrics[i].id); + } else { + PrefetchDebug("metric %s already initialized", getPrefetchMetricsNames(i)); + } + } + return status; +} + +bool +initializeLog(TSTextLogObject &log, const PrefetchConfig &config) +{ + bool status = true; + if (!config.getLogName().empty()) { + if (nullptr == log) { + TSReturnCode error = TSTextLogObjectCreate(config.getLogName().c_str(), TS_LOG_MODE_ADD_TIMESTAMP, &log); + if (error != TS_SUCCESS) { + PrefetchError("failed to create log file"); + status = false; + } else { + PrefetchDebug("initialized log file '%s'", config.getLogName().c_str()); + } + } else { + PrefetchDebug("log file '%s' already initialized", config.getLogName().c_str()); + } + } else { + PrefetchDebug("skip creating log file"); + } + return status; +} + +bool +BgFetchState::init(const PrefetchConfig &config) +{ + int status = true; + + /* Is throttling configured, 0 - don't throttle */ + _concurrentFetchesMax = config.getFetchMax(); + + /* Initialize the state */ + TSMutexLock(_lock); + + /* Initialize 'simple' policy used to avoid concurrent fetches of the same object */ + status &= initializePolicy(_unique, "simple"); + + /* Initialize the fetch metrics */ + status &= initializeMetrics(_metrics, config); + + /* Initialize the "pre-fetch" log */ + status &= initializeLog(_log, config); + + TSMutexUnlock(_lock); + + /* Initialize fetching policy */ + TSMutexLock(_policyLock); + + if (!config.getFetchPolicy().empty() && 0 != config.getFetchPolicy().compare("simple")) { + status &= initializePolicy(_policy, config.getFetchPolicy().c_str()); + if (nullptr != _policy) { + setMetric(FETCH_POLICY_MAXSIZE, _policy->getMaxSize()); + } + } else { + PrefetchDebug("Policy not specified or 'simple' policy chosen (skipping)"); + } + + TSMutexUnlock(_policyLock); + + return status; +} + +bool +BgFetchState::acquire(const String &url) +{ + bool permitted = true; + if (nullptr != _policy) { + TSMutexLock(_policyLock); + permitted = _policy->acquire(url); + TSMutexUnlock(_policyLock); + } + + if (permitted) { + incrementMetric(FETCH_POLICY_YES); + } else { + incrementMetric(FETCH_POLICY_NO); + } + + if (nullptr != _policy) { + setMetric(FETCH_POLICY_SIZE, _policy->getSize()); + } + + return permitted; +} + +bool +BgFetchState::release(const String &url) +{ + bool ret = true; + if (nullptr != _policy) { + TSMutexLock(_policyLock); + ret &= _policy->release(url); + TSMutexUnlock(_policyLock); + } + + if (nullptr != _policy) { + setMetric(FETCH_POLICY_SIZE, _policy->getSize()); + } + + return ret; +} + +bool +BgFetchState::uniqueAcquire(const String &url) +{ + bool permitted = true; + bool throttled = false; + size_t cachedCounter = 0; + + TSMutexLock(_lock); + if (0 == _concurrentFetchesMax || _concurrentFetches < _concurrentFetchesMax) { + permitted = _unique->acquire(url); + if (permitted) { + cachedCounter = ++_concurrentFetches; + } + } else { + throttled = true; + } + TSMutexUnlock(_lock); + + /* Update the metrics, no need to lock? */ + if (throttled) { + incrementMetric(FETCH_THROTTLED); + } + + if (permitted && !throttled) { + incrementMetric(FETCH_UNIQUE_YES); + incrementMetric(FETCH_TOTAL); + setMetric(FETCH_ACTIVE, cachedCounter); + } else { + incrementMetric(FETCH_UNIQUE_NO); + } + + return permitted; +} + +bool +BgFetchState::uniqueRelease(const String &url) +{ + bool permitted = true; + ssize_t cachedCounter = 0; + + TSMutexLock(_lock); + cachedCounter = --_concurrentFetches; + permitted = _unique->release(url); + TSMutexUnlock(_lock); + + TSAssert(cachedCounter < 0); + + /* Update the metrics, no need to lock? */ + if (permitted) { + setMetric(FETCH_ACTIVE, cachedCounter); + } + return permitted; +} + +void +BgFetchState::incrementMetric(PrefetchMetric m) +{ + if (-1 != _metrics[m].id) { + TSStatIntIncrement(_metrics[m].id, 1); + } +} + +void +BgFetchState::setMetric(PrefetchMetric m, size_t value) +{ + if (-1 != _metrics[m].id) { + TSStatIntSet(_metrics[m].id, value); + } +} + +inline TSTextLogObject +BgFetchState::getLog() +{ + return _log; +} +BgFetchStates *BgFetchStates::_prefetchStates = nullptr; + +BgFetch::BgFetch(BgFetchState *state, const PrefetchConfig &config, bool lock) + : _headerLoc(TS_NULL_MLOC), + _urlLoc(TS_NULL_MLOC), + vc(nullptr), + req_io_buf(nullptr), + resp_io_buf(nullptr), + req_io_buf_reader(nullptr), + resp_io_buf_reader(nullptr), + r_vio(nullptr), + w_vio(nullptr), + _bytes(0), + _cont(nullptr), + _state(state), + _config(config), + _askPermission(lock), + _startTime(0) +{ + _mbuf = TSMBufferCreate(); + memset(&client_ip, 0, sizeof(client_ip)); +} + +BgFetch::~BgFetch() +{ + TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _headerLoc); + TSHandleMLocRelease(_mbuf, TS_NULL_MLOC, _urlLoc); + + TSMBufferDestroy(_mbuf); + + if (vc) { + PrefetchError("Destroyed BgFetch while VC was alive"); + TSVConnClose(vc); + vc = nullptr; + } + + if (nullptr != _cont) { + if (_askPermission) { + _state->release(_cachekey); + _state->uniqueRelease(_cachekey); + } + + TSContDestroy(_cont); + _cont = nullptr; + + TSIOBufferReaderFree(req_io_buf_reader); + TSIOBufferDestroy(req_io_buf); + TSIOBufferReaderFree(resp_io_buf_reader); + TSIOBufferDestroy(resp_io_buf); + } +} + +bool +BgFetch::schedule(BgFetchState *state, const PrefetchConfig &config, bool askPermission, TSMBuffer requestBuffer, + TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *path, size_t pathLen, const String &cachekey) +{ + bool ret = false; + BgFetch *fetch = new BgFetch(state, config, askPermission); + if (fetch->init(requestBuffer, requestHeaderLoc, txnp, path, pathLen, cachekey)) { + fetch->schedule(); + ret = true; + } else { + delete fetch; + } + return ret; +} + +bool +BgFetch::saveIp(TSHttpTxn txnp) +{ + struct sockaddr const *ip = TSHttpTxnClientAddrGet(txnp); + if (ip) { + if (ip->sa_family == AF_INET) { + memcpy(&client_ip, ip, sizeof(sockaddr_in)); + } else if (ip->sa_family == AF_INET6) { + memcpy(&client_ip, ip, sizeof(sockaddr_in6)); + } else { + PrefetchError("unknown address family %d", ip->sa_family); + } + } else { + PrefetchError("failed to get client host info"); + return false; + } + return true; +} + +inline void +BgFetch::addBytes(int64_t b) +{ + _bytes += b; +} +/** + * Initialize the background fetch + */ +bool +BgFetch::init(TSMBuffer reqBuffer, TSMLoc reqHdrLoc, TSHttpTxn txnp, const char *fetchPath, size_t fetchPathLen, + const String &cachekey) +{ + TSAssert(TS_NULL_MLOC == _headerLoc); + TSAssert(TS_NULL_MLOC == _urlLoc); + + if (_askPermission) { + if (!_state->acquire(cachekey)) { + PrefetchDebug("request is not fetchable"); + return false; + } + + if (!_state->uniqueAcquire(cachekey)) { + PrefetchDebug("already fetching the object"); + _state->release(cachekey); + return false; + } + } + + _cachekey.assign(cachekey); + + /* Save the IP info */ + if (!saveIp(txnp)) { + return false; + } + + /* Create HTTP header */ + _headerLoc = TSHttpHdrCreate(_mbuf); + + /* Copy the headers to the new marshal buffer */ + if (TS_SUCCESS != TSHttpHdrCopy(_mbuf, _headerLoc, reqBuffer, reqHdrLoc)) { + PrefetchError("header copy failed"); + } + + /* Copy the pristine request URL into fetch marshal buffer */ + TSMLoc pristineUrlLoc; + if (TS_SUCCESS == TSHttpTxnPristineUrlGet(txnp, &reqBuffer, &pristineUrlLoc)) { + if (TS_SUCCESS != TSUrlClone(_mbuf, reqBuffer, pristineUrlLoc, &_urlLoc)) { + PrefetchError("failed to clone URL"); + TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc); + return false; + } + TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc); + } else { + PrefetchError("failed to get pristine URL"); + return false; + } + + /* Save the path before changing */ + int pathLen; + const char *path = TSUrlPathGet(_mbuf, _urlLoc, &pathLen); + if (nullptr == path) { + PrefetchError("failed to get a URL path"); + return false; + } + + /* Now set or remove the prefetch API header */ + const String &header = _config.getApiHeader(); + if (_config.isFront()) { + if (setHeader(_mbuf, _headerLoc, header.c_str(), (int)header.length(), path, pathLen)) { + PrefetchDebug("set header '%.*s: %.*s'", (int)header.length(), header.c_str(), (int)fetchPathLen, fetchPath); + } + } else { + if (removeHeader(_mbuf, _headerLoc, header.c_str(), header.length())) { + PrefetchDebug("remove header '%.*s'", (int)header.length(), header.c_str()); + } + } + + /* Make sure we remove the RANGE header to avoid 416 "Request Range Not Satisfiable" response when + * the current request is a RANGE request and its range turns out invalid for the "next" object */ + if (removeHeader(_mbuf, _headerLoc, TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE)) { + PrefetchDebug("remove header '%.*s'", TS_MIME_LEN_RANGE, TS_MIME_FIELD_RANGE); + } + + /* Overwrite the path if required */ + if (nullptr != fetchPath && 0 != fetchPathLen) { + if (TS_SUCCESS == TSUrlPathSet(_mbuf, _urlLoc, fetchPath, fetchPathLen)) { + PrefetchDebug("setting URL path to %.*s", (int)fetchPathLen, fetchPath); + } else { + PrefetchError("failed to set a URL path %.*s", (int)fetchPathLen, fetchPath); + } + } + + /* Come up with the host name to be used in the fetch request */ + const char *hostName = nullptr; + int hostNameLen = 0; + if (_config.getReplaceHost().empty()) { + hostName = TSUrlHostGet(_mbuf, _urlLoc, &hostNameLen); + } else { + hostName = _config.getReplaceHost().c_str(); + hostNameLen = _config.getReplaceHost().length(); + } + + /* Set the URI host */ + if (TS_SUCCESS == TSUrlHostSet(_mbuf, _urlLoc, hostName, hostNameLen)) { + PrefetchDebug("setting URL host: %.*s", hostNameLen, hostName); + } else { + PrefetchError("failed to set URL host: %.*s", hostNameLen, hostName); + } + + /* Set the host header */ + if (setHeader(_mbuf, _headerLoc, TS_MIME_FIELD_HOST, TS_MIME_LEN_HOST, hostName, hostNameLen)) { + PrefetchDebug("setting Host header: %.*s", hostNameLen, hostName); + } else { + PrefetchError("failed to set Host header: %.*s", hostNameLen, hostName); + } + + /* Save the URL to be fetched with this fetch for debugging purposes, expensive TSUrlStringGet() + * but really helpful when debugging multi-remap / host-replacement use cases */ + int urlLen = 0; + char *url = TSUrlStringGet(_mbuf, _urlLoc, &urlLen); + if (nullptr != url) { + _url.assign(url, urlLen); + TSfree(static_cast<void *>(url)); + } + + /* TODO: TBD is this the right place? */ + if (TS_SUCCESS != TSHttpHdrUrlSet(_mbuf, _headerLoc, _urlLoc)) { + return false; + } + + /* Initialization is success */ + return true; +} + +/** + * @brief Create, setup and schedule the background fetch continuation. + */ +void +BgFetch::schedule() +{ + TSAssert(nullptr == _cont); + + /* Setup the continuation */ + _cont = TSContCreate(handler, TSMutexCreate()); + TSContDataSet(_cont, static_cast<void *>(this)); + + /* Initialize the VIO (for the fetch) */ + req_io_buf = TSIOBufferCreate(); + req_io_buf_reader = TSIOBufferReaderAlloc(req_io_buf); + resp_io_buf = TSIOBufferCreate(); + resp_io_buf_reader = TSIOBufferReaderAlloc(resp_io_buf); + + /* Schedule */ + PrefetchDebug("schedule fetch: %s", _url.c_str()); + _startTime = TShrtime(); + TSContSchedule(_cont, 0, TS_THREAD_POOL_NET); +} + +/* Log format is: name-space bytes status url */ +void +BgFetch::logAndMetricUpdate(TSEvent event) const +{ + const char *status; + + switch (event) { + case TS_EVENT_VCONN_EOS: + status = "EOS"; + _state->incrementMetric(FETCH_COMPLETED); + break; + case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: + status = "TIMEOUT"; + _state->incrementMetric(FETCH_TIMEOOUTS); + break; + case TS_EVENT_ERROR: + _state->incrementMetric(FETCH_ERRORS); + status = "ERROR"; + break; + case TS_EVENT_VCONN_READ_COMPLETE: + _state->incrementMetric(FETCH_COMPLETED); + status = "READ_COMP"; + break; + default: + status = "UNKNOWN"; + break; + } + + if (TSIsDebugTagSet(PLUGIN_NAME "_log")) { + TSHRTime now = TShrtime(); + double elapsed = (double)(now - _startTime) / 1000000.0; + + PrefetchDebug("ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s", _config.getNameSpace().c_str(), _bytes, elapsed, + status, _url.c_str(), _cachekey.c_str()); + if (_state->getLog()) { + TSTextLogObjectWrite(_state->getLog(), "ns=%s bytes=%" PRId64 " time=%1.3lf status=%s url=%s key=%s", + _config.getNameSpace().c_str(), _bytes, elapsed, status, _url.c_str(), _cachekey.c_str()); + } + } +} + +/** + * @brief Continuation to perform a background fill of a URL. + * + * This is pretty expensive (memory allocations etc.) + */ +int +BgFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */) +{ + BgFetch *fetch = static_cast<BgFetch *>(TSContDataGet(contp)); + int64_t avail; + + PrefetchDebug("event: %s (%d)", TSHttpEventNameLookup(event), event); + + switch (event) { + case TS_EVENT_IMMEDIATE: + case TS_EVENT_TIMEOUT: + // Debug info for this particular bg fetch (put all debug in here please) + if (TSIsDebugTagSet(PLUGIN_NAME)) { + char buf[INET6_ADDRSTRLEN]; + const sockaddr *sockaddress = (const sockaddr *)&fetch->client_ip; + + switch (sockaddress->sa_family) { + case AF_INET: + inet_ntop(AF_INET, &(((struct sockaddr_in *)sockaddress)->sin_addr), buf, INET_ADDRSTRLEN); + PrefetchDebug("client IPv4 = %s", buf); + break; + case AF_INET6: + inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sockaddress)->sin6_addr), buf, INET6_ADDRSTRLEN); + PrefetchDebug("client IPv6 = %s", buf); + break; + default: + TSError("[%s] Unknown address family %d", PLUGIN_NAME, sockaddress->sa_family); + break; + } + PrefetchDebug("Starting background fetch."); + dumpHeaders(fetch->_mbuf, fetch->_headerLoc); + } + + // Setup the NetVC for background fetch + TSAssert(nullptr == fetch->vc); + if ((fetch->vc = TSHttpConnect((sockaddr *)&fetch->client_ip)) != nullptr) { + TSHttpHdrPrint(fetch->_mbuf, fetch->_headerLoc, fetch->req_io_buf); + // We never send a body with the request. ToDo: Do we ever need to support that ? + TSIOBufferWrite(fetch->req_io_buf, "\r\n", 2); + + fetch->r_vio = TSVConnRead(fetch->vc, contp, fetch->resp_io_buf, INT64_MAX); + fetch->w_vio = TSVConnWrite(fetch->vc, contp, fetch->req_io_buf_reader, TSIOBufferReaderAvail(fetch->req_io_buf_reader)); + } else { + delete fetch; + PrefetchError("Failed to connect to internal process, major malfunction"); + } + break; + + case TS_EVENT_VCONN_WRITE_COMPLETE: + // TSVConnShutdown(data->vc, 0, 1); + // TSVIOReenable(data->w_vio); + PrefetchDebug("write complete"); + break; + + case TS_EVENT_VCONN_READ_READY: + avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader); + fetch->addBytes(avail); + TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail); + TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail); + TSVIOReenable(fetch->r_vio); + break; + + case TS_EVENT_VCONN_READ_COMPLETE: + case TS_EVENT_VCONN_EOS: + case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: + case TS_EVENT_ERROR: + if (event == TS_EVENT_VCONN_INACTIVITY_TIMEOUT) { + PrefetchDebug("encountered Inactivity Timeout"); + TSVConnAbort(fetch->vc, TS_VC_CLOSE_ABORT); + } else { + TSVConnClose(fetch->vc); + } + + PrefetchDebug("closing background transaction"); + avail = TSIOBufferReaderAvail(fetch->resp_io_buf_reader); + fetch->addBytes(avail); + TSIOBufferReaderConsume(fetch->resp_io_buf_reader, avail); + TSVIONDoneSet(fetch->r_vio, TSVIONDoneGet(fetch->r_vio) + avail); + fetch->logAndMetricUpdate(event); + + /* Close, release and cleanup */ + fetch->vc = nullptr; + delete fetch; + break; + + default: + PrefetchDebug("unhandled event"); + break; + } + + return 0; +} diff --git a/plugins/experimental/prefetch/fetch.h b/plugins/experimental/prefetch/fetch.h new file mode 100644 index 0000000..4d73a7b --- /dev/null +++ b/plugins/experimental/prefetch/fetch.h @@ -0,0 +1,202 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file bg_fetch.h + * @brief Background fetch related classes (header file). + */ + +#pragma once + +#include <list> +#include <map> +#include <unordered_map> +#include <vector> + +#include "ts/ts.h" +#include "ts/experimental.h" +#include "common.h" +#include "configs.h" +#include "fetch_policy.h" + +enum PrefetchMetric { + FETCH_ACTIVE = 0, + FETCH_COMPLETED, + FETCH_ERRORS, + FETCH_TIMEOOUTS, + FETCH_THROTTLED, + FETCH_ALREADY_CACHED, /*metric if for counting how many times fetch was not scheduled because of cache-hit */ + FETCH_TOTAL, + FETCH_UNIQUE_YES, + FETCH_UNIQUE_NO, + FETCH_MATCH_YES, /* metric id for URL path pattern match successes */ + FETCH_MATCH_NO, /* metric id for URL path pattern match failures */ + FETCH_POLICY_YES, /* metric id for counting fetch policy successes */ + FETCH_POLICY_NO, /* metric id for counting fetch policy failures */ + FETCH_POLICY_SIZE, + FETCH_POLICY_MAXSIZE, + FETCHES_MAX_METRICS, +}; + +struct PrefetchMetricInfo { + PrefetchMetric index; + TSRecordDataType type; + int id; +}; + +/** + * @brief to store background fetch state, metrics, logs etc (shared between all scheduled fetches). + * + * @todo: reconsider the locks (tried to be granular but it feels too crowded, remove unnecessary locks) + */ +class BgFetchState +{ +public: + BgFetchState(); + virtual ~BgFetchState(); + bool init(const PrefetchConfig &config); + + /* Fetch policy */ + bool acquire(const String &url); + bool release(const String &url); + + /* De-duplication of requests */ + bool uniqueAcquire(const String &url); + bool uniqueRelease(const String &url); + + /* Metrics and logs */ + void incrementMetric(PrefetchMetric m); + void setMetric(PrefetchMetric m, size_t value); + TSTextLogObject getLog(); + +private: + BgFetchState(BgFetchState const &); /* never implement */ + void operator=(BgFetchState const &); /* never implement */ + + /* Fetch policy related */ + FetchPolicy *_policy; /* fetch policy */ + TSMutex _policyLock; /* protects the policy object only */ + + /* Mechanisms to avoid concurrent fetches and applying limits */ + FetchPolicy *_unique; /* make sure we never download same object multiple times at the same time */ + TSMutex _lock; /* protects the deduplication object only */ + size_t _concurrentFetches; + size_t _concurrentFetchesMax; + PrefetchMetricInfo _metrics[FETCHES_MAX_METRICS] = { + {FETCH_ACTIVE, TS_RECORDDATATYPE_INT, -1}, {FETCH_COMPLETED, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_ERRORS, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_TIMEOOUTS, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_THROTTLED, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_ALREADY_CACHED, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_TOTAL, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_UNIQUE_YES, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_UNIQUE_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_MATCH_YES, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_MATCH_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_POLICY_YES, TS_RECORDDATATYPE_COUNTER, -1}, + {FETCH_POLICY_NO, TS_RECORDDATATYPE_COUNTER, -1}, {FETCH_POLICY_SIZE, TS_RECORDDATATYPE_INT, -1}, + {FETCH_POLICY_MAXSIZE, TS_RECORDDATATYPE_INT, -1}}; + + /* plugin specific fetch logging */ + TSTextLogObject _log; +}; + +/** + * @brief Contains all background states to be shared between different plugin instances (grouped in namespaces) + */ +class BgFetchStates +{ +public: + /* Initialize on first use */ + static BgFetchStates * + get() + { + if (nullptr == _prefetchStates) { + _prefetchStates = new BgFetchStates(); + } + return _prefetchStates; + } + + BgFetchState * + getStateByName(const String &space) + { + BgFetchState *state; + std::map<String, BgFetchState *>::iterator it; + + TSMutexLock(_prefetchStates->_lock); + it = _prefetchStates->_states.find(space); + if (it != _prefetchStates->_states.end()) { + state = it->second; + } else { + state = new BgFetchState(); + _prefetchStates->_states[space] = state; + } + TSMutexUnlock(_prefetchStates->_lock); + return state; + } + +private: + BgFetchStates() : _lock(TSMutexCreate()) {} + ~BgFetchStates() { TSMutexDestroy(_lock); } + static BgFetchStates *_prefetchStates; + + std::map<String, BgFetchState *> _states; /* stores pointers to states per namespace */ + TSMutex _lock; +}; + +/** + * @brief Represents a single background fetch. + */ +class BgFetch +{ +public: + static bool schedule(BgFetchState *state, const PrefetchConfig &config, bool askPermission, TSMBuffer requestBuffer, + TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *path, size_t pathLen, const String &cachekey); + +private: + BgFetch(BgFetchState *state, const PrefetchConfig &config, bool lock); + ~BgFetch(); + bool init(TSMBuffer requestBuffer, TSMLoc requestHeaderLoc, TSHttpTxn txnp, const char *fetchPath, size_t fetchPathLen, + const String &cacheKey); + void schedule(); + static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */); + bool saveIp(TSHttpTxn txnp); + void addBytes(int64_t b); + void logAndMetricUpdate(TSEvent event) const; + + /* Request related */ + TSMBuffer _mbuf; + TSMLoc _headerLoc; + TSMLoc _urlLoc; + struct sockaddr_storage client_ip; + + /* This is for the actual background fetch / NetVC */ + TSVConn vc; + TSIOBuffer req_io_buf, resp_io_buf; + TSIOBufferReader req_io_buf_reader, resp_io_buf_reader; + TSVIO r_vio, w_vio; + int64_t _bytes; + + /* Background fetch continuation */ + TSCont _cont; + + /* Pointers and cache */ + String _cachekey; /* saving the cache key for later use */ + String _url; /* saving the URL for later use */ + BgFetchState *_state; /* pointer for access to the plugin state */ + const PrefetchConfig &_config; /* reference for access to the configuration */ + + bool _askPermission; /* true - check with the fetch policies if we should schedule the fetch */ + + TSHRTime _startTime; /* for calculation of downloadTime for this fetch */ +}; diff --git a/plugins/experimental/prefetch/fetch_policy.cc b/plugins/experimental/prefetch/fetch_policy.cc new file mode 100644 index 0000000..083257f --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy.cc @@ -0,0 +1,57 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy.cc + * @brief Fetch policy interface. + */ + +#include "fetch_policy.h" + +#include <string.h> + +#include "common.h" +#include "fetch_policy_lru.h" +#include "fetch_policy_simple.h" + +FetchPolicy * +FetchPolicy::getInstance(const char *parameters) +{ + const char *name = parameters; + const char *delim = strchr(parameters, ':'); + size_t len = (nullptr == delim ? strlen(name) : delim - name); + const char *params = (nullptr == delim ? nullptr : delim + 1); + + PrefetchDebug("getting '%.*s' policy instance, params: %s", (int)len, name, params); + FetchPolicy *p = nullptr; + if (6 == len && 0 == strncmp(name, "simple", 6)) { + p = new FetchPolicySimple(); + } else if (3 == len && 0 == strncmp(name, "lru", 3)) { + p = new FetchPolicyLru(); + } else { + PrefetchError("unrecognized fetch policy type: %.*s", (int)len, name); + return nullptr; + } + + if (p->init(params)) { + return p; + } + delete p; + + return nullptr; +} diff --git a/plugins/experimental/prefetch/fetch_policy.h b/plugins/experimental/prefetch/fetch_policy.h new file mode 100644 index 0000000..8594759 --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy.h @@ -0,0 +1,66 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy.h + * @brief Fetch policy interface (header file). + */ + +#pragma once + +#include <list> +#include <openssl/sha.h> +#include <string.h> +#include <string> +#include <unordered_map> + +#include "common.h" + +class FetchPolicy; +class SimplePolicy; +class Prescaler; + +/** + * @brief Fetch policy interface. + */ +class FetchPolicy +{ +public: + static FetchPolicy *getInstance(const char *name); + virtual ~FetchPolicy(){}; + + virtual bool init(const char *parameters) = 0; + virtual bool acquire(const std::string &url) = 0; + virtual bool release(const std::string &url) = 0; + virtual const char *name() = 0; + virtual size_t getSize() = 0; + virtual size_t getMaxSize() = 0; + +private: + FetchPolicy(const FetchPolicy &); + FetchPolicy &operator=(const FetchPolicy &); + +protected: + FetchPolicy(){}; + void + log(const char *msg, const String &url, bool ret) + { + PrefetchDebug("%s::%s('%.*s%s'): %s", name(), msg, (int)(url.length() > 100 ? 100 : url.length()), url.c_str(), + url.length() > 100 ? "..." : "", ret ? "true" : "false"); + } +}; diff --git a/plugins/experimental/prefetch/fetch_policy_lru.cc b/plugins/experimental/prefetch/fetch_policy_lru.cc new file mode 100644 index 0000000..c0b0902 --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy_lru.cc @@ -0,0 +1,141 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy_lru.cc + * @brief LRU fetch policy. + */ + +#include "fetch_policy_lru.h" +#include "common.h" + +inline const char * +FetchPolicyLru::name() +{ + return "lru"; +} + +bool +FetchPolicyLru::init(const char *parameters) +{ + if (nullptr == parameters) { + /* Leave defaults */ + } else { + size_t size = 0; + + /* look for buckets first */ + const char *sizeStr = parameters; + const char *delim = strchr(parameters, ','); + + if (nullptr == delim) { + /* no divider specified, set the buckets */ + size = getValue(sizeStr, strlen(sizeStr)); + } else { + /* set the buckets */ + size = getValue(sizeStr, delim - sizeStr); + } + + /* Defaults are considered minimum */ + static const char *defaultStr = " (default)"; + bool useDefault = false; + + /* Make sure size is not larger than what std::list is physically able to hold */ + LruList::size_type realMax = _list.max_size(); + if (size > realMax) { + PrefetchDebug("size: %lu is not feasible, cutting to %lu", size, realMax); + size = realMax; + } + /* Guarantee minimum value */ + if (size > _maxSize) { + _maxSize = size; + } else { + useDefault = true; + PrefetchError("size: %lu is not a good value", size); + }; + + PrefetchDebug("initialized %s fetch policy: size: %lu%s", name(), _maxSize, (useDefault ? defaultStr : "")); + } + + return true; +} + +inline size_t +FetchPolicyLru::getMaxSize() +{ + return _maxSize; +} + +inline size_t +FetchPolicyLru::getSize() +{ + return _size; +} + +bool +FetchPolicyLru::acquire(const std::string &url) +{ + bool ret = false; + + LruHash hash; + hash.init(url.c_str(), url.length()); + + LruMapIterator it = _map.find(&hash); + + if (_map.end() != it) { + PrefetchDebug("recently used LRU entry, moving to front"); + + /* We have an entry in the LRU */ + PrefetchAssert(_list.size() > 0); + + /* Move to the front of the list */ + _list.splice(_list.begin(), _list, it->second); + + /* Don't trigger fetch if the url is found amongst the most recently used ones */ + ret = false; + } else { + /* New LRU entry */ + if (_size >= _maxSize) { + /* Move the last (least recently used) element to the front and remove it from the hash table. */ + _list.splice(_list.begin(), _list, --_list.end()); + _map.erase(&(*_list.begin())); + PrefetchDebug("reused the least recently used LRU entry"); + } else { + /* With this implementation we are never removing LRU elements from the list but just updating the front element of the list + * so the following addition should happen at most FetchPolicyLru::_maxSize number of times */ + _list.push_front(NULL_LRU_ENTRY); + _size++; + PrefetchDebug("created a new LRU entry, size=%d", (int)_size); + } + /* Update the "new" or the most recently used LRU entry and add it to the hash */ + *_list.begin() = hash; + _map[&(*_list.begin())] = _list.begin(); + + /* Trigger fetch since the URL is not amongst the most recently used ones */ + ret = true; + } + + log("acquire", url, ret); + return ret; +} + +bool +FetchPolicyLru::release(const std::string &url) +{ + log("release", url, true); + return true; +} diff --git a/plugins/experimental/prefetch/fetch_policy_lru.h b/plugins/experimental/prefetch/fetch_policy_lru.h new file mode 100644 index 0000000..2699647 --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy_lru.h @@ -0,0 +1,105 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy_lru.h + * @brief LRU fetch policy (header file). + */ + +#pragma once + +#include "fetch_policy.h" + +/* Here reusing some of the classes used in cache_promote plugin. + * @todo: this was done in interest of time, see if LRU is what we really need, can we do it differently / better? */ +class LruHash +{ + friend struct LruHashHasher; + +public: + LruHash() {} + ~LruHash() {} + LruHash & + operator=(const LruHash &h) + { + memcpy(_hash, h._hash, sizeof(_hash)); + return *this; + } + + void + init(const char *data, int len) + { + SHA_CTX sha; + + SHA1_Init(&sha); + SHA1_Update(&sha, data, len); + SHA1_Final(_hash, &sha); + } + +private: + u_char _hash[SHA_DIGEST_LENGTH]; +}; + +struct LruHashHasher { + bool + operator()(const LruHash *s1, const LruHash *s2) const + { + return 0 == memcmp(s1->_hash, s2->_hash, sizeof(s2->_hash)); + } + + size_t + operator()(const LruHash *s) const + { + return *((size_t *)s->_hash) ^ *((size_t *)(s->_hash + 9)); + } +}; + +typedef LruHash LruEntry; +typedef std::list<LruEntry> LruList; +typedef std::unordered_map<const LruHash *, LruList::iterator, LruHashHasher, LruHashHasher> LruMap; +typedef LruMap::iterator LruMapIterator; + +static LruEntry NULL_LRU_ENTRY; // Used to create an "empty" new LRUEntry + +/** + * @brief Fetch policy that allows fetches only for not-"hot" objects. + * + * Trying to identify "hot" object by keeping track of most recently used objects and + * allows fetches only when a URL is not found in the most recently used set. + */ +class FetchPolicyLru : public FetchPolicy +{ +public: + /* Default size values are also considered minimum. TODO: find out if this works OK. */ + FetchPolicyLru() : _maxSize(10), _size(0){}; + virtual ~FetchPolicyLru(){}; + + /* Fetch policy interface methods */ + bool init(const char *parameters); + bool acquire(const std::string &url); + bool release(const std::string &url); + const char *name(); + size_t getMaxSize(); + size_t getSize(); + +protected: + LruMap _map; + LruList _list; + LruList::size_type _maxSize; + LruList::size_type _size; +}; diff --git a/plugins/experimental/prefetch/fetch_policy_simple.cc b/plugins/experimental/prefetch/fetch_policy_simple.cc new file mode 100644 index 0000000..aefd07f --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy_simple.cc @@ -0,0 +1,80 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy_simple.cc + * @brief Simple fetch policy. + */ + +#include "fetch_policy_simple.h" + +bool +FetchPolicySimple::init(const char *parameters) +{ + PrefetchDebug("initialized %s fetch policy", name()); + return true; +} + +bool +FetchPolicySimple::acquire(const std::string &url) +{ + bool ret; + if (_urls.end() == _urls.find(url)) { + _urls[url] = true; + ret = true; + } else { + ret = false; + } + + log("acquire", url, ret); + return ret; +} + +bool +FetchPolicySimple::release(const std::string &url) +{ + bool ret; + if (_urls.end() == _urls.find(url)) { + ret = false; + } else { + _urls.erase(url); + ret = true; + } + + log("release", url, ret); + return ret; +} + +inline const char * +FetchPolicySimple::name() +{ + return "simple"; +} + +inline size_t +FetchPolicySimple::getSize() +{ + return _urls.size(); +} + +inline size_t +FetchPolicySimple::getMaxSize() +{ + /* Unlimited */ + return 0; +} diff --git a/plugins/experimental/prefetch/fetch_policy_simple.h b/plugins/experimental/prefetch/fetch_policy_simple.h new file mode 100644 index 0000000..be04d86 --- /dev/null +++ b/plugins/experimental/prefetch/fetch_policy_simple.h @@ -0,0 +1,46 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file fetch_policy_simple.h + * @brief Simple fetch policy (header file). + */ + +#pragma once + +#include "fetch_policy.h" + +/** + * @brief Simple de-duplication fetch policy, used to make sure only one background fetch is running at a time. + */ + +class FetchPolicySimple : public FetchPolicy +{ +public: + FetchPolicySimple() {} + virtual ~FetchPolicySimple(){}; + bool init(const char *parameters); + bool acquire(const std::string &url); + bool release(const std::string &url); + const char *name(); + size_t getSize(); + size_t getMaxSize(); + +private: + std::unordered_map<std::string, bool> _urls; +}; diff --git a/plugins/experimental/prefetch/headers.cc b/plugins/experimental/prefetch/headers.cc new file mode 100644 index 0000000..8233dc5 --- /dev/null +++ b/plugins/experimental/prefetch/headers.cc @@ -0,0 +1,213 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file headers.cc + * @brief HTTP headers manipulation. + */ + +#include <stdlib.h> +#include <string.h> + +#include "configs.h" +#include "headers.h" + +/** + * @brief Remove a header (fully) from an TSMLoc / TSMBuffer. + * + * @param bufp request's buffer + * @param hdrLoc request's header location + * @param header header name + * @param headerlen header name length + * @return the number of fields (header values) we removed. + */ +int +removeHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen) +{ + TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen); + int cnt = 0; + + while (fieldLoc) { + TSMLoc tmp = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc); + + ++cnt; + TSMimeHdrFieldDestroy(bufp, hdrLoc, fieldLoc); + TSHandleMLocRelease(bufp, hdrLoc, fieldLoc); + fieldLoc = tmp; + } + + return cnt; +} + +/** + * @brief Checks if the header exists. + * + * @param bufp request's buffer + * @param hdrLoc request's header location + * @return true - exists, false - does not exist + */ +bool +headerExist(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen) +{ + TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen); + if (TS_NULL_MLOC != fieldLoc) { + TSHandleMLocRelease(bufp, hdrLoc, fieldLoc); + return true; + } + return false; +} + +/** + * @brief Get the header value + * + * @param bufp request's buffer + * @param hdrLoc request's header location + * @param header header name + * @param headerlen header name length + * @param value buffer for the value + * @param valuelen lenght of the buffer for the value + * @return pointer to the string with the value. + */ +char * +getHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen, char *value, int *valuelen) +{ + TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen); + char *dst = value; + while (fieldLoc) { + TSMLoc next = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc); + + int count = TSMimeHdrFieldValuesCount(bufp, hdrLoc, fieldLoc); + for (int i = 0; i < count; ++i) { + const char *v = nullptr; + int vlen = 0; + v = TSMimeHdrFieldValueStringGet(bufp, hdrLoc, fieldLoc, i, &vlen); + if (v == nullptr || vlen == 0) { + continue; + } + /* append the field content to the output buffer if enough space, plus space for ", " */ + bool first = (dst == value); + int neededSpace = ((dst - value) + vlen + (dst == value ? 0 : 2)); + if (neededSpace < *valuelen) { + if (!first) { + memcpy(dst, ", ", 2); + dst += 2; + } + memcpy(dst, v, vlen); + dst += vlen; + } + } + TSHandleMLocRelease(bufp, hdrLoc, fieldLoc); + fieldLoc = next; + } + + *valuelen = dst - value; + return value; +} + +/** + * @brief Set a header to a specific value. + * + * This will avoid going to through a remove / add sequence in case of an existing header but clean. + * + * @param bufp request's buffer + * @param hdrLoc request's header location + * @param header header name + * @param headerlen header name len + * @param value the new value + * @param valuelen lenght of the value + * @return true - OK, false - failed + */ +bool +setHeader(TSMBuffer bufp, TSMLoc hdrLoc, const char *header, int headerlen, const char *value, int valuelen) +{ + if (!bufp || !hdrLoc || !header || headerlen <= 0 || !value || valuelen <= 0) { + return false; + } + + bool ret = false; + TSMLoc fieldLoc = TSMimeHdrFieldFind(bufp, hdrLoc, header, headerlen); + + if (!fieldLoc) { + // No existing header, so create one + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdrLoc, header, headerlen, &fieldLoc)) { + if (TS_SUCCESS == TSMimeHdrFieldValueStringSet(bufp, hdrLoc, fieldLoc, -1, value, valuelen)) { + TSMimeHdrFieldAppend(bufp, hdrLoc, fieldLoc); + ret = true; + } + TSHandleMLocRelease(bufp, hdrLoc, fieldLoc); + } + } else { + TSMLoc tmp = nullptr; + bool first = true; + + while (fieldLoc) { + if (first) { + first = false; + if (TS_SUCCESS == TSMimeHdrFieldValueStringSet(bufp, hdrLoc, fieldLoc, -1, value, valuelen)) { + ret = true; + } + } else { + TSMimeHdrFieldDestroy(bufp, hdrLoc, fieldLoc); + } + tmp = TSMimeHdrFieldNextDup(bufp, hdrLoc, fieldLoc); + TSHandleMLocRelease(bufp, hdrLoc, fieldLoc); + fieldLoc = tmp; + } + } + + return ret; +} + +/** + * @brief Dump a header on stderr + * + * Useful together with TSDebug(). + * + * @param bufp request's buffer + * @param hdrLoc request's header location + */ +void +dumpHeaders(TSMBuffer bufp, TSMLoc hdrLoc) +{ + TSIOBuffer output_buffer; + TSIOBufferReader reader; + TSIOBufferBlock block; + const char *block_start; + int64_t block_avail; + + output_buffer = TSIOBufferCreate(); + reader = TSIOBufferReaderAlloc(output_buffer); + + /* This will print just MIMEFields and not the http request line */ + TSMimeHdrPrint(bufp, hdrLoc, output_buffer); + + /* We need to loop over all the buffer blocks, there can be more than 1 */ + block = TSIOBufferReaderStart(reader); + do { + block_start = TSIOBufferBlockReadStart(block, reader, &block_avail); + if (block_avail > 0) { + PrefetchDebug("Headers are:\n%.*s", static_cast<int>(block_avail), block_start); + } + TSIOBufferReaderConsume(reader, block_avail); + block = TSIOBufferReaderStart(reader); + } while (block && block_avail != 0); + + /* Free up the TSIOBuffer that we used to print out the header */ + TSIOBufferReaderFree(reader); + TSIOBufferDestroy(output_buffer); +} diff --git a/plugins/experimental/prefetch/headers.h b/plugins/experimental/prefetch/headers.h new file mode 100644 index 0000000..b6e0e1b --- /dev/null +++ b/plugins/experimental/prefetch/headers.h @@ -0,0 +1,31 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file headers.h + * @brief HTTP headers manipulation (header file). + */ + +#pragma once + +int removeHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len); +bool headerExist(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len); +char *getHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int headerlen, char *value, int *valuelen); + +bool setHeader(TSMBuffer bufp, TSMLoc hdr_loc, const char *header, int len, const char *val, int val_len); +void dumpHeaders(TSMBuffer bufp, TSMLoc hdr_loc); diff --git a/plugins/experimental/prefetch/pattern.cc b/plugins/experimental/prefetch/pattern.cc new file mode 100644 index 0000000..20e3b64 --- /dev/null +++ b/plugins/experimental/prefetch/pattern.cc @@ -0,0 +1,463 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file pattern.cc + * @brief PRCE related classes. + * @see pattern.h + */ + +#include "pattern.h" + +static void +replaceString(String &str, const String &from, const String &to) +{ + if (from.empty()) { + return; + } + + String::size_type start_pos = 0; + while ((start_pos = str.find(from, start_pos)) != String::npos) { + str.replace(start_pos, from.length(), to); + start_pos += to.length(); + } +} + +Pattern::Pattern() : _re(nullptr), _extra(nullptr), _pattern(""), _replacement(""), _tokenCount(0) {} + +/** + * @brief Initializes PCRE pattern by providing the subject and replacement strings. + * @param pattern PCRE pattern, a string containing PCRE patterns, capturing groups. + * @param replacement PCRE replacement, a string where $0 ... $9 will be replaced with the corresponding capturing groups + * @return true if successful, false if failure + */ +bool +Pattern::init(const String &pattern, const String &replacenemt) +{ + pcreFree(); + + _pattern.assign(pattern); + _replacement.assign(replacenemt); + + _tokenCount = 0; + + if (!compile()) { + PrefetchDebug("failed to initialize pattern:'%s', replacement:'%s'", pattern.c_str(), replacenemt.c_str()); + pcreFree(); + return false; + } + + return true; +} + +/** + * @brief Initializes PCRE pattern by providing the pattern only or pattern+replacement in a single configuration string. + * @see init() + * @param config PCRE pattern <pattern> or PCRE pattern + replacement in format /<pattern>/<replacement>/ + * @return true if successful, false if failure + */ +bool +Pattern::init(const String &config) +{ + if (config[0] == '/') { + /* This is a config in format /regex/replacement/ */ + String pattern; + String replacement; + + size_t start = 1; + size_t current = 0; + size_t next = 1; + do { + current = next + 1; + next = config.find_first_of("/", current); + } while (next != String::npos && '\\' == config[next - 1]); + + if (next != String::npos) { + pattern = config.substr(start, next - start); + } else { + /* Error, no closing '/' */ + PrefetchError("failed to parse the pattern in '%s'", config.c_str()); + return false; + } + + start = next + 1; + do { + current = next + 1; + next = config.find_first_of("/", current); + } while (next != String::npos && '\\' == config[next - 1]); + + if (next != String::npos) { + replacement = config.substr(start, next - start); + } else { + /* Error, no closing '/' */ + PrefetchError("failed to parse the replacement in '%s'", config.c_str()); + return false; + } + + // Remove '\' which escaped '/' inside the pattern and replacement strings. + ::replaceString(pattern, "\\/", "/"); + ::replaceString(replacement, "\\/", "/"); + + return this->init(pattern, replacement); + } else { + return this->init(config, ""); + } + + /* Should never get here. */ + return false; +} + +/** + * @brief Checks if the pattern object was initialized with a meaningful regex pattern. + * @return true if initialized, false if not. + */ +bool +Pattern::empty() const +{ + return _pattern.empty() || nullptr == _re; +} + +/** + * @brief Frees PCRE library related resources. + */ +void +Pattern::pcreFree() +{ + if (_re) { + pcre_free(_re); + _re = nullptr; + } + + if (_extra) { + pcre_free(_extra); + _extra = nullptr; + } +} + +/** + * @bried Destructor, frees PCRE related resources. + */ +Pattern::~Pattern() +{ + pcreFree(); +} + +/** + * @brief Capture or capture-and-replace depending on whether a replacement string is specified. + * @see replace() + * @see capture() + * @param subject PCRE subject string + * @param result vector of strings where the result of captures or the replacements will be returned. + * @return true if there was a match and capture or replacement succeeded, false if failure. + */ +bool +Pattern::process(const String &subject, StringVector &result) +{ + if (!_replacement.empty()) { + /* Replacement pattern was provided in the configuration - capture and replace. */ + String element; + if (replace(subject, element)) { + result.push_back(element); + } else { + return false; + } + } else { + /* Replacement was not provided so return all capturing groups except the group zero. */ + StringVector captures; + if (capture(subject, captures)) { + if (captures.size() == 1) { + result.push_back(captures[0]); + } else { + StringVector::iterator it = captures.begin() + 1; + for (; it != captures.end(); it++) { + result.push_back(*it); + } + } + } else { + return false; + } + } + + return true; +} + +/** + * @brief PCRE matches a subject string against the the regex pattern. + * @param subject PCRE subject + * @return true - matched, false - did not. + */ +bool +Pattern::match(const String &subject) +{ + int matchCount; + PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str()); + + if (!_re) { + return false; + } + + matchCount = pcre_exec(_re, _extra, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, nullptr, 0); + if (matchCount < 0) { + if (matchCount != PCRE_ERROR_NOMATCH) + PrefetchError("matching error %d", matchCount); + return false; + } + + return true; +} + +/** + * @brief Return all PCRE capture groups that matched in the subject string + * @param subject PCRE subject string + * @param result reference to vector of strings containing all capture groups + */ +bool +Pattern::capture(const String &subject, StringVector &result) +{ + int matchCount; + int ovector[OVECOUNT]; + + PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str()); + + if (!_re) { + return false; + } + + matchCount = pcre_exec(_re, nullptr, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, ovector, OVECOUNT); + if (matchCount < 0) { + if (matchCount != PCRE_ERROR_NOMATCH) + PrefetchError("matching error %d", matchCount); + return false; + } + + for (int i = 0; i < matchCount; i++) { + int start = ovector[2 * i]; + int length = ovector[2 * i + 1] - ovector[2 * i]; + + String dst(subject, start, length); + + PrefetchDebug("capturing '%s' %d[%d,%d]", dst.c_str(), i, ovector[2 * i], ovector[2 * i + 1]); + result.push_back(dst); + } + + return true; +} + +/** + * @brief Replaces all replacements found in the replacement string with what matched in the PCRE capturing groups. + * @param subject PCRE subject string + * @param result reference to A string where the result of the replacement will be stored + * @return true - success, false - nothing matched or failure. + */ +bool +Pattern::replace(const String &subject, String &result) +{ + int matchCount; + int ovector[OVECOUNT]; + + PrefetchDebug("matching '%s' to '%s'", _pattern.c_str(), subject.c_str()); + + if (!_re) { + return false; + } + + matchCount = pcre_exec(_re, nullptr, subject.c_str(), subject.length(), 0, PCRE_NOTEMPTY, ovector, OVECOUNT); + if (matchCount < 0) { + if (matchCount != PCRE_ERROR_NOMATCH) + PrefetchError("matching error %d", matchCount); + return false; + } + + /* Verify the replacement has the right number of matching groups */ + for (int i = 0; i < _tokenCount; i++) { + if (_tokens[i] >= matchCount) { + PrefetchError("invalid reference in replacement string: $%d", _tokens[i]); + return false; + } + } + + int previous = 0; + for (int i = 0; i < _tokenCount; i++) { + int replIndex = _tokens[i]; + int start = ovector[2 * replIndex]; + int length = ovector[2 * replIndex + 1] - ovector[2 * replIndex]; + + String src(_replacement, _tokenOffset[i], 2); + String dst(subject, start, length); + + PrefetchDebug("replacing '%s' with '%s'", src.c_str(), dst.c_str()); + + result.append(_replacement, previous, _tokenOffset[i] - previous); + result.append(dst); + + previous = _tokenOffset[i] + 2; /* 2 is the size of $0 or $1 or $2, ... or $9 */ + } + + result.append(_replacement, previous, _replacement.length() - previous); + + PrefetchDebug("replacing '%s' resulted in '%s'", _replacement.c_str(), result.c_str()); + + return true; +} + +/** + * @brief PCRE compiles the regex, called only during initialization. + * @return true if successful, false if not. + */ +bool +Pattern::compile() +{ + const char *errPtr; /* PCRE error */ + int errOffset; /* PCRE error offset */ + + PrefetchDebug("compiling pattern:'%s', replacement:'%s'", _pattern.c_str(), _replacement.c_str()); + + _re = pcre_compile(_pattern.c_str(), /* the pattern */ + 0, /* options */ + &errPtr, /* for error message */ + &errOffset, /* for error offset */ + nullptr); /* use default character tables */ + + if (nullptr == _re) { + PrefetchError("compile of regex '%s' at char %d: %s", _pattern.c_str(), errOffset, errPtr); + + return false; + } + + _extra = pcre_study(_re, 0, &errPtr); + + if ((nullptr == _extra) && (nullptr != errPtr) && (0 != *errPtr)) { + PrefetchError("failed to study regex '%s': %s", _pattern.c_str(), errPtr); + + pcre_free(_re); + _re = nullptr; + return false; + } + + if (_replacement.empty()) { + /* No replacement necessary - we are done. */ + return true; + } + + _tokenCount = 0; + bool success = true; + + for (unsigned i = 0; i < _replacement.length(); i++) { + if (_replacement[i] == '$') { + if (_tokenCount >= TOKENCOUNT) { + PrefetchError("too many tokens in replacement string: %s", _replacement.c_str()); + + success = false; + break; + } else if (_replacement[i + 1] < '0' || _replacement[i + 1] > '9') { + PrefetchError("invalid replacement token $%c in %s: should be $0 - $9", _replacement[i + 1], _replacement.c_str()); + + success = false; + break; + } else { + /* Store the location of the replacement */ + /* Convert '0' to 0 */ + _tokens[_tokenCount] = _replacement[i + 1] - '0'; + _tokenOffset[_tokenCount] = i; + _tokenCount++; + /* Skip the next char */ + i++; + } + } + } + + if (!success) { + pcreFree(); + } + + return success; +} + +/** + * @brief Destructor, deletes all patterns. + */ +MultiPattern::~MultiPattern() +{ + for (std::vector<Pattern *>::iterator p = this->_list.begin(); p != this->_list.end(); ++p) { + delete (*p); + } +} + +/** + * @brief Check if empty. + * @return true if the classification contains any patterns, false otherwise + */ +bool +MultiPattern::empty() const +{ + return _list.empty(); +} + +/** + * @brief Adds a pattern to the multi-pattern + * + * The order of addition matters during the classification + * @param pattern pattern pointer + */ +void +MultiPattern::add(Pattern *pattern) +{ + this->_list.push_back(pattern); +} + +/** + * @brief Matches the subject string against all patterns. + * @param subject subject string. + * @return true if any matches, false if nothing matches. + */ +bool +MultiPattern::match(const String &subject) const +{ + for (std::vector<Pattern *>::const_iterator p = this->_list.begin(); p != this->_list.end(); ++p) { + if (nullptr != (*p) && (*p)->match(subject)) { + return true; + } + } + return false; +} + +/** + * @brief Calls Pattern::replace() on all patterns in the multi-pattern one by one until the first match. + * @param subject subject string. + * @param result vector of the result. + * @return true if any matches, false if nothing matches. + */ +bool +MultiPattern::replace(const String &subject, String &result) const +{ + for (std::vector<Pattern *>::const_iterator p = this->_list.begin(); p != this->_list.end(); ++p) { + if (nullptr != (*p) && (*p)->replace(subject, result)) { + return true; + } + } + return false; +} + +/** + * @brief Returns the name of the multi-pattern (set during the instantiation only). + */ +const String & +MultiPattern::name() const +{ + return _name; +} diff --git a/plugins/experimental/prefetch/pattern.h b/plugins/experimental/prefetch/pattern.h new file mode 100644 index 0000000..2db7665 --- /dev/null +++ b/plugins/experimental/prefetch/pattern.h @@ -0,0 +1,92 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file pattern.h + * @brief PRCE related classes (header file). + */ + +#pragma once + +#ifdef HAVE_PCRE_PCRE_H +#include <pcre/pcre.h> +#else +#include <pcre.h> +#endif + +#include "common.h" + +/** + * @brief PCRE matching, capturing and replacing + */ +class Pattern +{ +public: + static const int TOKENCOUNT = 10; /**< @brief Capturing groups $0..$9 */ + static const int OVECOUNT = TOKENCOUNT * 3; /**< @brief pcre_exec() array count, handle 10 capture groups */ + + Pattern(); + virtual ~Pattern(); + + bool init(const String &pattern, const String &replacenemt); + bool init(const String &config); + bool empty() const; + bool match(const String &subject); + bool capture(const String &subject, StringVector &result); + bool replace(const String &subject, String &result); + bool process(const String &subject, StringVector &result); + +private: + bool compile(); + bool failed(const String &subject) const; + void pcreFree(); + + pcre *_re; /**< @brief PCRE compiled info structure, computed during initialization */ + pcre_extra *_extra; /**< @brief PCRE study data block, computed during initialization */ + + String _pattern; /**< @brief PCRE pattern string, containing PCRE patterns and capturing groups. */ + String _replacement; /**< @brief PCRE replacement string, containing $0..$9 to be replaced with content of the capturing groups */ + + int _tokenCount; /**< @brief number of replacements $0..$9 found in the replacement string if not empty */ + int _tokens[TOKENCOUNT]; /**< @brief replacement index 0..9, since they can be used in the replacement string in any order */ + int _tokenOffset[TOKENCOUNT]; /**< @brief replacement offset inside the replacement string */ +}; + +/** + * @brief Named list of regular expressions. + */ +class MultiPattern +{ +public: + MultiPattern(const String name = "") : _name(name) {} + virtual ~MultiPattern(); + + bool empty() const; + void add(Pattern *pattern); + virtual bool match(const String &subject) const; + virtual bool replace(const String &subject, String &result) const; + const String &name() const; + +protected: + std::vector<Pattern *> _list; /**< @brief vector which dictates the order of the pattern evaluation. */ + String _name; /**< @brief multi-pattern name */ + +private: + MultiPattern(const MultiPattern &); // disallow + MultiPattern &operator=(const MultiPattern &); // disallow +}; diff --git a/plugins/experimental/prefetch/plugin.cc b/plugins/experimental/prefetch/plugin.cc new file mode 100644 index 0000000..1c16ae0 --- /dev/null +++ b/plugins/experimental/prefetch/plugin.cc @@ -0,0 +1,751 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file plugin.cc + * @brief traffic server plugin entry points. + */ + +#include <sstream> +#include <iomanip> + +#include "ts/ts.h" /* ATS API */ + +#include "ts/remap.h" /* TSRemapInterface, TSRemapStatus, apiInfo */ + +#include "common.h" +#include "configs.h" +#include "fetch.h" +#include "fetch_policy.h" +#include "headers.h" + +static const char * +getEventName(TSEvent event) +{ + switch (event) { + case TS_EVENT_HTTP_CONTINUE: + return "TS_EVENT_HTTP_CONTINUE"; + case TS_EVENT_HTTP_ERROR: + return "TS_EVENT_HTTP_ERROR"; + case TS_EVENT_HTTP_READ_REQUEST_HDR: + return "TS_EVENT_HTTP_READ_REQUEST_HDR"; + case TS_EVENT_HTTP_OS_DNS: + return "TS_EVENT_HTTP_OS_DNS"; + case TS_EVENT_HTTP_SEND_REQUEST_HDR: + return "TS_EVENT_HTTP_SEND_REQUEST_HDR"; + case TS_EVENT_HTTP_READ_CACHE_HDR: + return "TS_EVENT_HTTP_READ_CACHE_HDR"; + case TS_EVENT_HTTP_READ_RESPONSE_HDR: + return "TS_EVENT_HTTP_READ_RESPONSE_HDR"; + case TS_EVENT_HTTP_SEND_RESPONSE_HDR: + return "TS_EVENT_HTTP_SEND_RESPONSE_HDR"; + case TS_EVENT_HTTP_REQUEST_TRANSFORM: + return "TS_EVENT_HTTP_REQUEST_TRANSFORM"; + case TS_EVENT_HTTP_RESPONSE_TRANSFORM: + return "TS_EVENT_HTTP_RESPONSE_TRANSFORM"; + case TS_EVENT_HTTP_SELECT_ALT: + return "TS_EVENT_HTTP_SELECT_ALT"; + case TS_EVENT_HTTP_TXN_START: + return "TS_EVENT_HTTP_TXN_START"; + case TS_EVENT_HTTP_TXN_CLOSE: + return "TS_EVENT_HTTP_TXN_CLOSE"; + case TS_EVENT_HTTP_SSN_START: + return "TS_EVENT_HTTP_SSN_START"; + case TS_EVENT_HTTP_SSN_CLOSE: + return "TS_EVENT_HTTP_SSN_CLOSE"; + case TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE: + return "TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE"; + case TS_EVENT_HTTP_PRE_REMAP: + return "TS_EVENT_HTTP_PRE_REMAP"; + case TS_EVENT_HTTP_POST_REMAP: + return "TS_EVENT_HTTP_POST_REMAP"; + default: + return "UNHANDLED"; + } + return "UNHANDLED"; +} + +static const char * +getCacheLookupResultName(TSCacheLookupResult result) +{ + switch (result) { + case TS_CACHE_LOOKUP_MISS: + return "TS_CACHE_LOOKUP_MISS"; + break; + case TS_CACHE_LOOKUP_HIT_STALE: + return "TS_CACHE_LOOKUP_HIT_STALE"; + break; + case TS_CACHE_LOOKUP_HIT_FRESH: + return "TS_CACHE_LOOKUP_HIT_FRESH"; + break; + case TS_CACHE_LOOKUP_SKIPPED: + return "TS_CACHE_LOOKUP_SKIPPED"; + break; + default: + return "UNKNOWN_CACHE_LOOKUP_EVENT"; + break; + } + return "UNKNOWN_CACHE_LOOKUP_EVENT"; +} + +/** + * @brief Plugin initialization. + * @param apiInfo remap interface info pointer + * @param errBuf error message buffer + * @param errBufSize error message buffer size + * @return always TS_SUCCESS. + */ +TSReturnCode +TSRemapInit(TSRemapInterface *apiInfo, char *errBuf, int erroBufSize) +{ + return TS_SUCCESS; +} + +/** + * @brief Plugin instance data. + */ + +struct PrefetchInstance { + PrefetchInstance() : _state(nullptr){}; + +private: + PrefetchInstance(PrefetchInstance const &); + void operator=(BgFetchState const &); + +public: + PrefetchConfig _config; + BgFetchState *_state; +}; + +/** + * brief Plugin transaction data. + */ +class PrefetchTxnData +{ +public: + PrefetchTxnData(PrefetchInstance *inst) + : _inst(inst), _front(true), _firstPass(true), _fetchable(false), _status(TS_HTTP_STATUS_OK) + { + } + + bool + firstPass() const + { + return _firstPass; + } + + bool + secondPass() const + { + return !_firstPass; + } + + bool + frontend() const + { + return _front; + } + + bool + backend() const + { + return !_front; + } + + PrefetchInstance *_inst; /* Pointer to the plugin instance */ + + bool _front; /* front-end vs back-end */ + bool _firstPass; /* first vs second pass */ + + /* saves state between hooks */ + String _cachekey; /* cache key */ + bool _fetchable; /* saves the result of the attempt to fetch */ + TSHttpStatus _status; /* status to return to the UA */ + String _body; /* body to return to the UA */ +}; + +/** + * @brief Evaluate a math addition or subtraction expression. + * + * @param v string containing an expression, i.e. "3 + 4" + * @return string containing the result, i.e. "7" + */ +static String +evaluate(const String &v) +{ + if (v.empty()) { + return String(""); + } + + /* Find out if width is specified (hence leading zeros are required if the width is bigger then the result width) */ + String stmt; + size_t len = 0; + size_t pos = v.find_first_of(":"); + if (String::npos != pos) { + stmt.assign(v.substr(0, pos)); + len = getValue(v.substr(pos + 1)); + } else { + stmt.assign(v); + } + PrefetchDebug("statement: '%s', formating length: %zu", stmt.c_str(), len); + + int result = 0; + pos = stmt.find_first_of("+-"); + + if (String::npos == pos) { + result = getValue(stmt); + } else { + unsigned a = getValue(stmt.substr(0, pos)); + unsigned b = getValue(stmt.substr(pos + 1)); + + if ('+' == stmt[pos]) { + result = a + b; + } else { + result = a - b; + } + } + + std::ostringstream convert; + convert << std::setw(len) << std::setfill('0') << result; + PrefetchDebug("evaluation of '%s' resulted in '%s'", v.c_str(), convert.str().c_str()); + return convert.str(); +} + +/** + * @brief Expand+evaluate (in place) an expression surrounded with "{" and "}" and uses evaluate() to evaluate the math expression. + * + * @param s string containing an expression, i.e. "{3 + 4}" + * @return void + */ +static void +expand(String &s) +{ + size_t cur = 0; + while (String::npos != cur) { + size_t start = s.find_first_of("{", cur); + size_t stop = s.find_first_of("}", start); + + if (String::npos != start && String::npos != stop) { + s.replace(start, stop - start + 1, evaluate(s.substr(start + 1, stop - start - 1))); + cur = stop + 1; + } else { + cur = stop; + } + } +} + +/** + * @brief Get the cachekey used for the particular object in this transaction. + * + * @param txnp HTTP transaction structure + * @param reqBuffer request TSMBuffer + * @param destination string reference to where the result is to be appended. + * @return true if success or false if failure + */ +bool +appendCacheKey(const TSHttpTxn txnp, const TSMBuffer reqBuffer, String &key) +{ + bool ret = false; + TSMLoc keyLoc = TS_NULL_MLOC; + if (TS_SUCCESS == TSUrlCreate(reqBuffer, &keyLoc)) { + if (TS_SUCCESS == TSHttpTxnCacheLookupUrlGet(txnp, reqBuffer, keyLoc)) { + int urlLen = 0; + char *url = TSUrlStringGet(reqBuffer, keyLoc, &urlLen); + if (nullptr != url) { + key.append(url, urlLen); + PrefetchDebug("cache key: %s", key.c_str()); + TSfree(static_cast<void *>(url)); + ret = true; + } + } + TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, keyLoc); + } + + if (!ret) { + PrefetchError("failed to get cache key"); + } + return ret; +} + +/** + * @brief Find out if the object was found fresh in cache. + * + * This function finaly controls if the pre-fetch should be scheduled or not. + * @param txnp HTTP transaction structure + * @return true - hit fresh, false - miss/stale/skipped or error + */ +static bool +foundFresh(TSHttpTxn txnp) +{ + bool fresh = false; + int lookupStatus; + if (TS_SUCCESS == TSHttpTxnCacheLookupStatusGet(txnp, &lookupStatus)) { + PrefetchDebug("lookup status: %s", getCacheLookupResultName((TSCacheLookupResult)lookupStatus)); + if (TS_CACHE_LOOKUP_HIT_FRESH == lookupStatus) { + fresh = true; + } + } else { + /* Failed to get the lookup status, likely a previous plugin already prepared the client response w/o a cache lookup, + * we don't really know if the cache has a fresh object, so just don't trigger pre-fetch */ + PrefetchDebug("failed to check cache-ability"); + } + return fresh; +} + +/** + * @brief Check if the response from origin for N-th object is success (200 and 206) + * + * and only then schedule a pre-fetch for the next + * + * @param txnp HTTP transaction structure + * @return true - yes, false - no + */ +bool +isResponseGood(TSHttpTxn txnp) +{ + bool good = false; + TSMBuffer respBuffer; + TSMLoc respHdrLoc; + if (TS_SUCCESS == TSHttpTxnServerRespGet(txnp, &respBuffer, &respHdrLoc)) { + TSHttpStatus status = TSHttpHdrStatusGet(respBuffer, respHdrLoc); + PrefetchDebug("origin response code: %d", status); + if (TS_HTTP_STATUS_PARTIAL_CONTENT == status || TS_HTTP_STATUS_OK == status) { + good = true; + } + /* Release the response MLoc */ + TSHandleMLocRelease(respBuffer, TS_NULL_MLOC, respHdrLoc); + } else { + /* Failed to get the origin response, possible cause could be a origin connection problems or timeouts or + * a previous plugin could have already prepared the client response w/o going to origin server */ + PrefetchDebug("failed to get origin response"); + } + return good; +} + +/** + * @brief get the pristin URL path + * + * @param txnp HTTP transaction structure + * @return pristine URL path + */ +static String +getPristineUrlPath(TSHttpTxn txnp) +{ + String pristinePath; + TSMLoc pristineUrlLoc; + TSMBuffer reqBuffer; + + if (TS_SUCCESS == TSHttpTxnPristineUrlGet(txnp, &reqBuffer, &pristineUrlLoc)) { + int pathLen = 0; + const char *path = TSUrlPathGet(reqBuffer, pristineUrlLoc, &pathLen); + if (nullptr != path) { + PrefetchDebug("path: '%.*s'", pathLen, path); + pristinePath.assign(path, pathLen); + } else { + PrefetchError("failed to get pristine URL path"); + } + TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, pristineUrlLoc); + } else { + PrefetchError("failed to get pristine URL"); + } + return pristinePath; +} + +/** + * @brief short-cut to set the response . + */ +TSEvent +shortcutResponse(PrefetchTxnData *data, TSHttpStatus status, const char *body, TSEvent event) +{ + data->_status = status; + data->_body.assign(body); + return event; +} + +/** + * @brief Checks if we are still supposed to schedule a background fetch based on whether the object is in the cache. + * It is 'fetchable' only if not a fresh hit. + * + * @param txnp HTTP transaction structure + * @param data transaction data + * @return true if fetchable and false if not. + */ +static bool +isFetchable(TSHttpTxn txnp, PrefetchTxnData *data) +{ + bool fetchable = false; + BgFetchState *state = data->_inst->_state; + if (!foundFresh(txnp)) { + /* Schedule fetch only if not in cache */ + PrefetchDebug("object to be fetched"); + fetchable = true; + } else { + PrefetchDebug("object already in cache or to be skipped"); + state->incrementMetric(FETCH_ALREADY_CACHED); + state->incrementMetric(FETCH_TOTAL); + } + return fetchable; +} + +/** + * @brief Find out if the current response to trigger a background prefetch. + * + * Pre-fetch only on HTTP codes 200 and 206 or object found in cache (previous good response) + * + * @param txnp HTTP transaction structure + * @return true - trigger prefetch, false - don't trigger. + */ +static bool +respToTriggerPrefetch(TSHttpTxn txnp) +{ + bool trigger = false; + if (foundFresh(txnp)) { + /* If found in cache and fresh trigger next (same as good response from origin) */ + PrefetchDebug("trigger background fetch (cached)"); + trigger = true; + } else if (isResponseGood(txnp)) { + /* Trigger all necessary background fetches based on the next path pattern */ + PrefetchDebug("trigger background fetch (good origin response)"); + trigger = true; + } else { + PrefetchDebug("don't trigger background fetch"); + } + return trigger; +} + +/** + * @brief Callback function that handles necessary foreground / background fetch operations. + * + * @param contp continuation associated with this function. + * @param event corresponding event triggered at different hooks. + * @param edata HTTP transaction structures. + * @return always 0 + */ +int +contHandleFetch(const TSCont contp, TSEvent event, void *edata) +{ + PrefetchTxnData *data = static_cast<PrefetchTxnData *>(TSContDataGet(contp)); + TSHttpTxn txnp = static_cast<TSHttpTxn>(edata); + PrefetchConfig &config = data->_inst->_config; + BgFetchState *state = data->_inst->_state; + TSMBuffer reqBuffer; + TSMLoc reqHdrLoc; + + PrefetchDebug("event: %s (%d)", getEventName(event), event); + + TSEvent retEvent = TS_EVENT_HTTP_CONTINUE; + + if (TS_SUCCESS != TSHttpTxnClientReqGet(txnp, &reqBuffer, &reqHdrLoc)) { + PrefetchError("failed to get client request"); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + return 0; + } + + switch (event) { + case TS_EVENT_HTTP_POST_REMAP: { + /* Use the cache key since this has better lookup behavior when using plugins like the cachekey plugin, + * for example multiple URIs can match a single cache key */ + if (data->frontend() && data->secondPass()) { + /* Create a separate cache key name space to be used only for front-end and second-pass fetch policy checks. */ + data->_cachekey.assign("/prefetch"); + } + if (!appendCacheKey(txnp, reqBuffer, data->_cachekey)) { + PrefetchError("failed to get the cache key"); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + return 0; + } + + if (data->frontend()) { + /* front-end instance */ + if (data->firstPass()) { + /* first-pass */ + if (!config.isExactMatch()) { + data->_fetchable = state->acquire(data->_cachekey); + PrefetchDebug("request is%sfetchable", data->_fetchable ? " " : " not "); + } + } + } + } break; + + case TS_EVENT_HTTP_CACHE_LOOKUP_COMPLETE: { + if (data->frontend()) { + /* front-end instance */ + if (data->secondPass()) { + /* second-pass */ + data->_fetchable = state->acquire(data->_cachekey); + data->_fetchable = data->_fetchable && state->uniqueAcquire(data->_cachekey); + PrefetchDebug("request is%sfetchable", data->_fetchable ? " " : " not "); + + if (isFetchable(txnp, data)) { + if (!data->_fetchable) { + /* Cancel the requested fetch */ + retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR); + } else { + /* Fetch */ + } + } else { + retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR); + } + } + } else { + /* back-end instance */ + if (data->firstPass()) { + if (isFetchable(txnp, data)) { + if (BgFetch::schedule(state, config, /* askPermission */ true, reqBuffer, reqHdrLoc, txnp, nullptr, 0, data->_cachekey)) { + retEvent = shortcutResponse(data, TS_HTTP_STATUS_OK, "fetch scheduled\n", TS_EVENT_HTTP_ERROR); + } else { + retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR); + } + } else { + retEvent = shortcutResponse(data, TS_HTTP_STATUS_ALREADY_REPORTED, "fetch not scheduled\n", TS_EVENT_HTTP_ERROR); + } + } + } + } break; + + case TS_EVENT_HTTP_SEND_RESPONSE_HDR: { + if (data->frontend()) { + /* front-end instance */ + + if (data->firstPass() && data->_fetchable && !config.getNextPath().empty() && respToTriggerPrefetch(txnp)) { + /* Trigger all necessary background fetches based on the next path pattern */ + + String currentPath = getPristineUrlPath(txnp); + if (!currentPath.empty()) { + unsigned total = config.getFetchCount(); + for (unsigned i = 0; i < total; ++i) { + PrefetchDebug("generating prefetch request %d/%d", i + 1, total); + String expandedPath; + + if (config.getNextPath().replace(currentPath, expandedPath)) { + PrefetchDebug("replaced: %s", expandedPath.c_str()); + expand(expandedPath); + PrefetchDebug("expanded: %s", expandedPath.c_str()); + + BgFetch::schedule(state, config, /* askPermission */ false, reqBuffer, reqHdrLoc, txnp, expandedPath.c_str(), + expandedPath.length(), data->_cachekey); + } else { + /* We should be here only if the pattern replacement fails (match already checked) */ + PrefetchError("failed to process the pattern"); + + /* If the first or any matches fails there must be something wrong so don't continue */ + break; + } + currentPath.assign(expandedPath); + } + } else { + PrefetchDebug("failed to get current path"); + } + } + } + + if ((data->backend() && data->firstPass()) || (data->frontend() && data->secondPass() && !data->_body.empty())) { + TSMBuffer bufp; + TSMLoc hdrLoc; + + if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &bufp, &hdrLoc)) { + const char *reason = TSHttpHdrReasonLookup(data->_status); + int reasonLen = strlen(reason); + TSHttpHdrStatusSet(bufp, hdrLoc, data->_status); + TSHttpHdrReasonSet(bufp, hdrLoc, reason, reasonLen); + PrefetchDebug("set response: %d %.*s '%s'", data->_status, reasonLen, reason, data->_body.c_str()); + + char *buf = (char *)TSmalloc(data->_body.length() + 1); + sprintf(buf, "%s", data->_body.c_str()); + TSHttpTxnErrorBodySet(txnp, buf, strlen(buf), nullptr); + + setHeader(bufp, hdrLoc, TS_MIME_FIELD_CACHE_CONTROL, TS_MIME_LEN_CACHE_CONTROL, TS_HTTP_VALUE_NO_STORE, + TS_HTTP_LEN_NO_STORE); + + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdrLoc); + } else { + PrefetchError("failed to retrieve client response header"); + } + } + } break; + + case TS_EVENT_HTTP_TXN_CLOSE: { + if (data->_fetchable) { + if (data->frontend()) { + /* front-end */ + if (data->firstPass()) { + /* first-pass */ + if (!config.isExactMatch()) { + state->release(data->_cachekey); + } + } else { + /* second-pass */ + state->uniqueRelease(data->_cachekey); + state->release(data->_cachekey); + } + } + } + + /* Destroy the txn continuation and its data */ + delete data; + TSContDestroy(contp); + } break; + + default: { + PrefetchError("unhandled event: %s(%d)", getEventName(event), event); + } break; + } + + /* Release the request MLoc */ + TSHandleMLocRelease(reqBuffer, TS_NULL_MLOC, reqHdrLoc); + + /* Reenable and continue with the state machine. */ + TSHttpTxnReenable(txnp, retEvent); + return 0; +} + +/** + * @brief Plugin new instance entry point. + * + * Processes the configuration and initializes the plugin instance. + * @param argc plugin arguments number + * @param argv plugin arguments + * @param instance new plugin instance pointer (initialized in this function) + * @param errBuf error message buffer + * @param errBufSize error message buffer size + * @return TS_SUCCES if success or TS_ERROR if failure + */ +TSReturnCode +TSRemapNewInstance(int argc, char *argv[], void **instance, char *errBuf, int errBufSize) +{ + bool failed = true; + + PrefetchInstance *inst = new PrefetchInstance(); + if (nullptr != inst) { + if (inst->_config.init(argc, argv)) { + inst->_state = BgFetchStates::get()->getStateByName(inst->_config.getNameSpace()); + if (nullptr != inst->_state) { + failed = !inst->_state->init(inst->_config); + } + } + } + + if (failed) { + PrefetchError("failed to initialize the plugin"); + delete inst; + *instance = nullptr; + return TS_ERROR; + } + + *instance = inst; + return TS_SUCCESS; +} + +/** + * @brief Plugin instance deletion clean-up entry point. + * @param plugin instance pointer. + */ +void +TSRemapDeleteInstance(void *instance) +{ + PrefetchInstance *inst = (PrefetchInstance *)instance; + delete inst; +} + +/** + * @brief Organizes the background fetch by registering necessary hooks, by identifying front-end vs back-end, first vs second + * pass. + * + * Remap is never done, continue with next in chain. + * @param instance plugin instance pointer + * @param txnp transaction handle + * @param rri remap request info pointer + * @return always TSREMAP_NO_REMAP + */ +TSRemapStatus +TSRemapDoRemap(void *instance, TSHttpTxn txnp, TSRemapRequestInfo *rri) +{ + PrefetchInstance *inst = (PrefetchInstance *)instance; + + if (nullptr != inst) { + PrefetchConfig &config = inst->_config; + + int methodLen = 0; + const char *method = TSHttpHdrMethodGet(rri->requestBufp, rri->requestHdrp, &methodLen); + const String &header = config.getApiHeader(); + if (nullptr != method && methodLen == TS_HTTP_LEN_GET && 0 == memcmp(TS_HTTP_METHOD_GET, method, TS_HTTP_LEN_GET)) { + bool front = config.isFront(); + bool firstPass = false; + if (headerExist(rri->requestBufp, rri->requestHdrp, header.c_str(), header.length())) { + PrefetchDebug("%s: found %.*s", front ? "front-end" : "back-end", (int)header.length(), header.c_str()); + /* On front-end: presence of header means second-pass, on back-end means first-pass. */ + firstPass = !front; + } else { + /* On front-end: lack of header means first-pass, on back-end means second-pass. */ + firstPass = front; + } + + /* Make sure we handle only URLs that match the path pattern on the front-end + first-pass, cancel otherwise */ + bool handleFetch = true; + if (front && firstPass) { + /* Front-end plug-in instance + first pass. */ + if (config.getNextPath().empty()) { + /* No next path pattern specified then pass this request untouched. */ + PrefetchDebug("next object pattern not specified, skip"); + handleFetch = false; + } else { + /* Next path pattern specified hence try to match. */ + String pristinePath = getPristineUrlPath(txnp); + if (!pristinePath.empty()) { + if (config.getNextPath().match(pristinePath)) { + /* Matched - handle the request */ + PrefetchDebug("matched next object pattern"); + inst->_state->incrementMetric(FETCH_MATCH_YES); + } else { + /* Next path pattern specified but did not match. */ + PrefetchDebug("failed to match next object pattern, skip"); + inst->_state->incrementMetric(FETCH_MATCH_NO); + handleFetch = false; + } + } else { + PrefetchDebug("failed to get path to (pre)match"); + } + } + } + + if (handleFetch) { + PrefetchTxnData *data = new PrefetchTxnData(inst); + if (nullptr != data) { + data->_front = front; + data->_firstPass = firstPass; + + TSCont cont = TSContCreate(contHandleFetch, TSMutexCreate()); + TSContDataSet(cont, static_cast<void *>(data)); + + TSHttpTxnHookAdd(txnp, TS_HTTP_POST_REMAP_HOOK, cont); + TSHttpTxnHookAdd(txnp, TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK, cont); + TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, cont); + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, cont); + } else { + PrefetchError("failed to allocate transaction data object"); + } + } + } else { + PrefetchDebug("not a GET method (%.*s), skipping", methodLen, method); + } + } else { + PrefetchError("could not get prefetch instance"); + } + + return TSREMAP_NO_REMAP; +}