This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 42ce5ad  METRON-2148 Stellar REST POST function (merrimanr) closes 
apache/metron#1440
42ce5ad is described below

commit 42ce5add9ae5b20bb3667e81e8aaa148f48a4d42
Author: merrimanr <merrim...@gmail.com>
AuthorDate: Thu Jul 11 08:16:30 2019 -0500

    METRON-2148 Stellar REST POST function (merrimanr) closes apache/metron#1440
---
 metron-stellar/stellar-common/README.md            |  98 ++-
 .../metron/stellar/common/utils/JSONUtils.java     |   6 +-
 .../metron/stellar/dsl/functions/RestConfig.java   |  23 +-
 .../stellar/dsl/functions/RestFunctions.java       | 657 +++++++++++++--------
 .../functions/RestFunctionsIntegrationTest.java    | 490 +++++++++++++++
 .../stellar/dsl/functions/RestFunctionsTest.java   | 449 +++-----------
 6 files changed, 1095 insertions(+), 628 deletions(-)

diff --git a/metron-stellar/stellar-common/README.md 
b/metron-stellar/stellar-common/README.md
index 5e48b1c..9f1634b 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -963,6 +963,16 @@ Where:
   * Input:
     * url - URI to the REST service
     * rest_config - Optional - Map (in curly braces) of name:value pairs, each 
overriding the global config parameter of the same name. Default is the empty 
Map, meaning no overrides.
+    * query_parameters - Optional - Map (in curly braces) of name:value pairs 
that will be added to the request as query parameters
+  * Returns: JSON results as a Map
+  
+### `REST_POST`
+  * Description: Performs a REST POST request and parses the JSON results into 
a map.
+  * Input:
+    * url - URI to the REST service
+    * post_data - POST data that will be sent in the POST request.  Must be 
well-formed JSON unless the 'enforce.json' property is set to false.
+    * rest_config - Optional - Map (in curly braces) of name:value pairs, each 
overriding the global config parameter of the same name. Default is the empty 
Map, meaning no overrides.
+    * query_parameters - Optional - Map (in curly braces) of name:value pairs 
that will be added to the request as query parameters
   * Returns: JSON results as a Map
 
 ### `ROUND`
@@ -1649,15 +1659,64 @@ that specify what should be included when searching for 
Stellar functions.
 
 ## Stellar REST Client
 
-Stellar provides a REST Client with the `REST_GET` function.  This function 
depends on the Apache HttComponents library for
-executing Http requests.  The syntax is:
+Stellar provides a REST Client with the `REST_GET` and `REST_POST` functions.  
This function depends on the Apache HttComponents library for
+executing Http requests.  
+
+### REST GET Syntax
+The REST_GET function requires a URI along with an optional configuration and 
an optional map of query parameters.  The syntax is:
 ```
-REST_GET( uri , optional config )
+REST_GET( uri , optional config , optional query parameters )
+```
+
+### REST POST Syntax
+The REST_POST function requires a URI and POST data along with an optional 
configuration and an optional map of query parameters.  The syntax is:
+```
+REST_POST( uri , data, optional config , optional query parameters )
 ```
 
 ### Configuration
 
-The second argument is an optional Map of settings.  The following settings 
are available:
+Stellar REST functions can be configured several different ways.  Sensible 
defaults are set for applicable settings with the option to override settings 
at different levels.
+For REST_GET, configuration settings are applied in this order (last has 
highest priority):
+1. Default settings
+2. Settings stored in the Global Config for all Stellar REST functions
+3. Settings stored in the Global Config for all Stellar REST_GET calls
+4. Settings passed into the function call as an argument
+
+For REST_POST, configuration settings are applied in this order (last has 
highest priority):
+1. Default settings
+2. Settings stored in the Global Config for all Stellar REST functions
+3. Settings stored in the Global Config for all Stellar REST_POST calls
+4. Settings passed into the function call as an argument
+
+For example, assume the Global Config is set to:
+```
+{
+  "stellar.rest.settings": {
+    "proxy.basic.auth.user": "global_proxy_user",
+    "basic.auth.user": "global_user",
+    "empty.content.override": "global content override"
+  },
+  "stellar.rest.get.settings": {
+    "basic.auth.user": "rest_get_user",
+    "empty.content.override": "rest get content override"
+  }
+}
+```
+and the function call is:
+```
+REST_GET('some uri', { "empty.content.override": "function config override" } )
+```
+After the various settings are applied in order of priority, the final 
configuration is:
+```
+{
+  "proxy.basic.auth.user": "global_proxy_user",
+  "basic.auth.user": "rest_get_user",
+  "empty.content.override": "function config override"
+}
+```
+
+The following is a list of settings that are available:
 
 * basic.auth.user - User name for basic authentication.
 * basic.auth.password.path - Path to the basic authentication password file 
stored in HDFS.
@@ -1665,31 +1724,19 @@ The second argument is an optional Map of settings.  
The following settings are
 * proxy.port - Proxy port.
 * proxy.basic.auth.user - User name for proxy basic authentication.
 * proxy.basic.auth.password.path - Path to the proxy basic authentication 
password file stored in HDFS.
-* timeout - Stellar enforced hard timeout for the total request time. Defaults 
to 1000 ms.  HttpClient timeouts alone are insufficient to guarantee the hard 
timeout.
+* timeout - Stellar enforced hard timeout (in milliseconds) for the total 
request time. HttpClient timeouts alone are insufficient to guarantee the hard 
timeout. (Defaults to `1000`)
 * connect.timeout - Connect timeout exposed by the HttpClient object.
 * connection.request.timeout - Connection request timeout exposed by the 
HttpClient object.
 * socket.timeout - Socket timeout exposed by the HttpClient object.
-* response.codes.allowed - A list of response codes that are allowed.  All 
others will be treated as errors.  Defaults to `200`.
-* empty.content.override - The default value that will be returned on a 
successful request with empty content.  Defaults to null.
-* error.value.override - The default value that will be returned on an error.  
Defaults to null.
+* response.codes.allowed - A list of response codes that are allowed.  All 
others will be treated as errors.  (Defaults to `200`)
+* empty.content.override - The default value that will be returned on a 
successful request with empty content.  (Defaults to null)
+* error.value.override - The default value that will be returned on an error.  
(Defaults to null)
 * pooling.max.total - The maximum number of connections in the connection pool.
 * pooling.default.max.per.route - The default maximum number of connections 
per route in the connection pool.
+* verify.content.length - Setting this to true will verify the actual body 
content length equals the content length header. (Defaults to false)
+* enforce.json - Setting this to true will verify POST data is well-formed 
JSON. (Defaults to true)
 
-This Map of settings can also be stored in the global config 
`stellar.rest.settings` property.  For example, to configure basic 
authentication
-settings you would add this property to the global config:
-
-```
-{
-  "stellar.rest.settings": {
-    "basic.auth.user": "user",
-    "basic.auth.password.path": "/password/path"
-  }
-}
-```
-
-Any settings passed into the expression will take precedence over the global 
config settings.  The global config settings will take precedence over the 
defaults.
-
-For security purposes, passwords are read from a file in HDFS.  Passwords are 
read as is including any new lines or spaces. Be careful not to include these 
in the file unless they are specifically part of the password.
+For security purposes, all passwords are read from a file in HDFS.  Passwords 
are read as is including any new lines or spaces. Be careful not to include 
these in the file unless they are specifically part of the password.
 
 ### Security
 
@@ -1719,6 +1766,11 @@ Perform a GET request using a proxy:
 {args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, 
Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, 
User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 
136.62.241.236, url=http://httpbin.org/get}
 ```
 
+Perform a POST request with additional query parameters:
+```
+
+```
+
 ### Latency
 
 Performing a REST request will introduce latency in a streaming pipeline.  
Therefore this function should only be used for low volume telemetries that are 
unlikely to be
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
index 9fb1c3f..90620a8 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
@@ -116,6 +116,10 @@ public enum JSONUtils {
    * Transforms a bean (aka POJO) to a JSONObject.
    */
   public JSONObject toJSONObject(Object o) throws JsonProcessingException, 
ParseException {
-    return (JSONObject) _parser.get().parse(toJSON(o, false));
+    return toJSONObject(toJSON(o, false));
+  }
+
+  public JSONObject toJSONObject(String json) throws ParseException {
+    return (JSONObject) _parser.get().parse(json);
   }
 }
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
index 610717e..a32faec 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
@@ -20,6 +20,7 @@ package org.apache.metron.stellar.dsl.functions;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A Map containing the Stellar REST settings.
@@ -32,6 +33,16 @@ public class RestConfig extends HashMap<String, Object> {
   public final static String STELLAR_REST_SETTINGS = "stellar.rest.settings";
 
   /**
+   * A global config prefix used for storing Stellar REST GET settings.
+   */
+  public final static String STELLAR_REST_GET_SETTINGS = 
"stellar.rest.get.settings";
+
+  /**
+   * A global config prefix used for storing Stellar REST POST settings.
+   */
+  public final static String STELLAR_REST_POST_SETTINGS = 
"stellar.rest.post.settings";
+
+  /**
    * User name for basic authentication.
    */
   public final static String BASIC_AUTH_USER = "basic.auth.user";
@@ -99,14 +110,20 @@ public class RestConfig extends HashMap<String, Object> {
   public final static String POOLING_DEFAULT_MAX_PER_RUOTE = 
"pooling.default.max.per.route";
 
   /**
-   * Setting this to true will verify the actual body content length equals 
the content length header
+   * Setting this to true will verify the actual body content length equals 
the content length header.
    */
   public final static String VERIFY_CONTENT_LENGTH = "verify.content.length";
 
+  /**
+   * Setting this to true will verify POST data is well-formed JSON.
+   */
+  public final static String ENFORCE_JSON = "enforce.json";
+
   public RestConfig() {
     put(TIMEOUT, 1000);
     put(RESPONSE_CODES_ALLOWED, Collections.singletonList(200));
     put(VERIFY_CONTENT_LENGTH, false);
+    put(ENFORCE_JSON, true);
   }
 
   public String getBasicAuthUser() {
@@ -173,4 +190,8 @@ public class RestConfig extends HashMap<String, Object> {
   public Boolean verifyContentLength() {
     return (Boolean) get(VERIFY_CONTENT_LENGTH);
   }
+
+  public Boolean enforceJson() {
+    return (Boolean) get(ENFORCE_JSON);
+  }
 }
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
index d6b03ce..5058527 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.stellar.dsl.functions;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,9 +29,10 @@ import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.*;
 import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
@@ -46,11 +48,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -62,9 +64,7 @@ import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
 
 /**
  * Defines functions that enable REST requests with proper result and error 
handling.  Depends on an
@@ -77,20 +77,52 @@ public class RestFunctions {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   /**
-   * Get an argument from a list of arguments.
-   *
-   * @param index The index within the list of arguments.
-   * @param clazz The type expected.
-   * @param args All of the arguments.
-   * @param <T> The type of the argument expected.
+   * The CloseableHttpClient.
    */
-  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+  private static CloseableHttpClient closeableHttpClient;
 
-    if(index >= args.size()) {
-      throw new IllegalArgumentException(format("Expected at least %d 
argument(s), found %d", index+1, args.size()));
+  /**
+   * Executor used to impose a hard request timeout.
+   */
+  private static ScheduledExecutorService scheduledExecutorService;
+
+  /**
+   * Initialize a single HttpClient to be shared by REST functions.
+   * @param context
+   */
+  private static synchronized void initializeHttpClient(Context context) {
+    if (closeableHttpClient == null) {
+      closeableHttpClient = getHttpClient(context);
     }
+  }
 
-    return ConversionUtils.convert(args.get(index), clazz);
+  /**
+   * Close the shared HttpClient.
+   */
+  private static synchronized void closeHttpClient() throws IOException {
+    if (closeableHttpClient != null) {
+      closeableHttpClient.close();
+      closeableHttpClient = null;
+    }
+  }
+
+  /**
+   * Initialize a single ExecutorService to be shared by REST functions.
+   */
+  private static synchronized void initializeExecutorService() {
+    if (scheduledExecutorService == null) {
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    }
+  }
+
+  /**
+   * Shutdown the shared ExecutorService.
+   */
+  private static synchronized void closeExecutorService() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService = null;
+    }
   }
 
   @Stellar(
@@ -100,7 +132,8 @@ public class RestFunctions {
           params = {
                   "url - URI to the REST service",
                   "rest_config - Optional - Map (in curly braces) of 
name:value pairs, each overriding the global config parameter " +
-                          "of the same name. Default is the empty Map, meaning 
no overrides."
+                          "of the same name. Default is the empty Map, meaning 
no overrides.",
+                  "query_parameters - Optional - Map (in curly braces) of 
name:value pairs that will be added to the request as query parameters"
           },
           returns = "JSON results as a Map")
   public static class RestGet implements StellarFunction {
@@ -111,24 +144,14 @@ public class RestFunctions {
     private boolean initialized = false;
 
     /**
-     * The CloseableHttpClient.
-     */
-    private CloseableHttpClient httpClient;
-
-    /**
-     * Executor used to impose a hard request timeout.
-     */
-    private ScheduledExecutorService scheduledExecutorService;
-
-    /**
      * Initialize the function by creating a ScheduledExecutorService and 
looking up the CloseableHttpClient from the
      * Stellar context.
      * @param context
      */
     @Override
     public void initialize(Context context) {
-      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-      httpClient = getHttpClient(context);
+      initializeExecutorService();
+      initializeHttpClient(context);
       initialized = true;
     }
 
@@ -144,20 +167,24 @@ public class RestFunctions {
      */
     @Override
     public Object apply(List<Object> args, Context context) throws 
ParseException {
-      RestConfig restConfig = new RestConfig();
-      try {
-        URI uri = new URI(getArg(0, String.class, args));
-        restConfig = getRestConfig(args, getGlobalConfig(context));
-
-        HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), 
uri.getScheme());
-        Optional<HttpHost> proxy = getProxy(restConfig);
-        HttpClientContext httpClientContext = getHttpClientContext(restConfig, 
target, proxy);
+      String uriString = getArg(0, String.class, args);
+      Map<String, Object> functionRestConfig = null;
+      Map<String, Object> queryParameters = new HashMap<>();
+      if (args.size() > 1) {
+        functionRestConfig = getArg(1, Map.class, args);
+        if (args.size() == 3) {
+          queryParameters = getArg(2, Map.class, args);
+        }
+      }
 
-        HttpGet httpGet = new HttpGet(uri);
-        httpGet.addHeader("Accept", "application/json");
-        httpGet.setConfig(getRequestConfig(restConfig, proxy));
+      // Build the RestConfig by applying settins in order of precedence
+      Map<String, Object> globalRestConfig = (Map<String, Object>) 
getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
+      Map<String, Object> getRestConfig = (Map<String, Object>) 
getGlobalConfig(context).get(STELLAR_REST_GET_SETTINGS);
+      RestConfig restConfig = buildRestConfig(globalRestConfig, getRestConfig, 
functionRestConfig);
 
-        return doGet(restConfig, httpGet, httpClientContext);
+      try {
+        HttpGet httpGet = buildGetRequest(uriString, queryParameters);
+        return executeRequest(restConfig, httpGet);
       } catch (URISyntaxException e) {
         throw new IllegalArgumentException(e.getMessage(), e);
       } catch (IOException e) {
@@ -168,241 +195,399 @@ public class RestFunctions {
 
     @Override
     public void close() throws IOException {
-      if (httpClient != null) {
-        httpClient.close();
-      }
-      if (scheduledExecutorService != null) {
-        scheduledExecutorService.shutdown();
-      }
+      closeHttpClient();
+      closeExecutorService();
     }
 
-    /**
-     * Retrieves the ClosableHttpClient from a pooling connection manager.
-     *
-     * @param context The execution context.
-     * @return A ClosableHttpClient.
-     */
-    protected CloseableHttpClient getHttpClient(Context context) {
-      RestConfig restConfig = getRestConfig(Collections.emptyList(), 
getGlobalConfig(context));
-
-      PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
+    private HttpGet buildGetRequest(String uri, Map<String, Object> 
queryParameters) throws URISyntaxException {
+      HttpGet httpGet = new HttpGet(getURI(uri, queryParameters));
+      httpGet.addHeader("Accept", "application/json");
 
-      return HttpClients.custom()
-              .setConnectionManager(cm)
-              .build();
+      return httpGet;
     }
+  }
 
-    protected PoolingHttpClientConnectionManager 
getConnectionManager(RestConfig restConfig) {
-      PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager();
-      if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
-        cm.setMaxTotal(restConfig.getPoolingMaxTotal());
-      }
-      if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
-        cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
-      }
-      return cm;
-    }
+  @Stellar(
+          namespace = "REST",
+          name = "POST",
+          description = "Performs a REST POST request and parses the JSON 
results into a map.",
+          params = {
+                  "url - URI to the REST service",
+                  "post_data - POST data that will be sent in the POST 
request.  Must be well-formed JSON unless the 'enforce.json' property is set to 
false.",
+                  "rest_config - Optional - Map (in curly braces) of 
name:value pairs, each overriding the global config parameter " +
+                          "of the same name. Default is the empty Map, meaning 
no overrides.",
+                  "query_parameters - Optional - Map (in curly braces) of 
name:value pairs that will be added to the request as query parameters"
+
+          },
+          returns = "JSON results as a Map")
+  public static class RestPost implements StellarFunction {
 
     /**
-     * Only used for testing.
-     * @param httpClient
+     * Whether the function has been initialized.
      */
-    protected void setHttpClient(CloseableHttpClient httpClient) {
-      this.httpClient = httpClient;
-    }
+    private boolean initialized = false;
 
     /**
-     * Perform the HttpClient get and handle the results.  A configurable list 
of status codes are accepted and the
-     * response content (expected to be json) is parsed into a Map.  Values 
returned on errors and when response content
-     * is also configurable.  The rest config "timeout" setting is imposed in 
this method and will abort the get request
-     * if exceeded.
-     *
-     * @param restConfig
-     * @param httpGet
-     * @param httpClientContext
-     * @return
-     * @throws IOException
+     * Initialize the function by creating a ScheduledExecutorService and 
looking up the CloseableHttpClient from the
+     * Stellar context.
+     * @param context
      */
-    protected Object doGet(RestConfig restConfig, HttpGet httpGet, 
HttpClientContext httpClientContext) throws IOException {
+    @Override
+    public void initialize(Context context) {
+      initializeExecutorService();
+      initializeHttpClient(context);
+      initialized = true;
+    }
 
-      // Schedule a command to abort the httpGet request if the timeout is 
exceeded
-      ScheduledFuture scheduledFuture = 
scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), 
TimeUnit.MILLISECONDS);
-      CloseableHttpResponse response;
-      try {
-        response = httpClient.execute(httpGet, httpClientContext);
-      } catch(Exception e) {
-        // Report a timeout if the httpGet request was aborted.  Otherwise 
rethrow exception.
-        if (httpGet.isAborted()) {
-          throw new IOException(String.format("Total Stellar REST request time 
to %s exceeded the configured timeout of %d ms.", httpGet.getURI().toString(), 
restConfig.getTimeout()));
-        } else {
-          throw e;
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      String uriString = getArg(0, String.class, args);
+      Object dataObject = getArg(1, Object.class, args);
+      Map<String, Object> functionRestConfig = null;
+      Map<String, Object> queryParameters = new HashMap<>();
+      if (args.size() > 2) {
+        functionRestConfig = getArg(2, Map.class, args);
+        if (args.size() == 4) {
+          queryParameters = getArg(3, Map.class, args);
         }
       }
 
-      // Cancel the future if the request finished within the timeout
-      if (!scheduledFuture.isDone()) {
-        scheduledFuture.cancel(true);
-      }
-      int statusCode = response.getStatusLine().getStatusCode();
-      LOG.debug("request = {}; response = {}", httpGet, response);
-      if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
-        HttpEntity httpEntity = response.getEntity();
-
-        // Parse the response if present, return the empty value override if 
not
-        Optional<Object> parsedResponse = parseResponse(restConfig, httpGet, 
httpEntity);
-        return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
-      } else {
-        throw new IOException(String.format("Stellar REST request to %s 
expected status code to be one of %s but " +
-                "failed with http status code %d: %s",
-                httpGet.getURI().toString(),
-                restConfig.getResponseCodesAllowed().toString(),
-                statusCode,
-                EntityUtils.toString(response.getEntity())));
+      // Build the RestConfig by applying settins in order of precedence
+      Map<String, Object> globalRestConfig = (Map<String, Object>) 
getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
+      Map<String, Object> postRestConfig = (Map<String, Object>) 
getGlobalConfig(context).get(STELLAR_REST_POST_SETTINGS);
+      RestConfig restConfig = buildRestConfig(globalRestConfig, 
postRestConfig, functionRestConfig);
+
+      try {
+        HttpPost httpPost = buildPostRequest(restConfig, uriString, 
dataObject, queryParameters);
+        return executeRequest(restConfig, httpPost);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return restConfig.getErrorValueOverride();
       }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getGlobalConfig(Context context) {
-      Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, 
false);
-      return globalCapability.map(o -> (Map<String, Object>) 
o).orElseGet(HashMap::new);
+    @Override
+    public void close() throws IOException {
+      closeHttpClient();
+      closeExecutorService();
     }
 
-    /**
-     * Build the RestConfig object using the following order of precedence:
-     * <ul>
-     *   <li>rest config supplied as an expression parameter</li>
-     *   <li>rest config stored in the global config</li>
-     *   <li>default rest config</li>
-     * </ul>
-     * Only settings specified in the rest config will override lower priority 
config settings.
-     * @param args
-     * @param globalConfig
-     * @return
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    protected RestConfig getRestConfig(List<Object> args, Map<String, Object> 
globalConfig) {
-      Map<String, Object> globalRestConfig = (Map<String, Object>) 
globalConfig.get(STELLAR_REST_SETTINGS);
-      Map<String, Object> functionRestConfig = null;
-      if (args.size() > 1) {
-        functionRestConfig = getArg(1, Map.class, args);
-      }
+    private HttpPost buildPostRequest(RestConfig restConfig, String uriString, 
Object dataObject, Map<String, Object> queryParameters) throws 
JsonProcessingException, URISyntaxException, UnsupportedEncodingException {
+      String body = getPostData(restConfig, dataObject);
 
-      // Add settings in order of precedence
-      RestConfig restConfig = new RestConfig();
-      if (globalRestConfig != null) {
-        restConfig.putAll(globalRestConfig);
-      }
-      if (functionRestConfig != null) {
-        restConfig.putAll(functionRestConfig);
-      }
-      return restConfig;
-    }
+      URI uri = getURI(uriString, queryParameters);
+      HttpPost httpPost = new HttpPost(uri);
+      httpPost.setEntity(new StringEntity(body));
+      httpPost.addHeader("Accept", "application/json");
+      httpPost.addHeader("Content-type", "application/json");
 
-    /**
-     * Returns the proxy HttpHost object if the proxy rest config settings are 
set.
-     * @param restConfig
-     * @return
-     */
-    protected Optional<HttpHost> getProxy(RestConfig restConfig) {
-      Optional<HttpHost> proxy = Optional.empty();
-      if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != 
null) {
-        proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), 
restConfig.getProxyPort(), "http"));
-      }
-      return proxy;
+      return httpPost;
     }
 
     /**
-     * Builds the RequestConfig object by setting HttpClient settings defined 
in the rest config.
-     * @param restConfig
-     * @param proxy
-     * @return
+     * Serializes the supplied POST data to be sent in the POST request.  
Checks for well-formed JSON by default unless 'enforce.json' is set to false.
+     * @param restConfig RestConfig
+     * @param arg POST data
+     * @return Serialized POST data
+     * @throws JsonProcessingException
      */
-    protected RequestConfig getRequestConfig(RestConfig restConfig, 
Optional<HttpHost> proxy) {
-      RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
-      if (restConfig.getConnectTimeout() != null) {
-        requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
-      }
-      if (restConfig.getConnectionRequestTimeout() != null) {
-        
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+    private String getPostData(RestConfig restConfig, Object arg) throws 
JsonProcessingException {
+      String data = "";
+      if (arg == null) {
+        return data;
       }
-      if (restConfig.getSocketTimeout() != null) {
-        requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+      if (arg instanceof Map) {
+        data = JSONUtils.INSTANCE.toJSON(arg, false);
+      } else {
+        data = arg.toString();
+        if (restConfig.enforceJson()) {
+          try {
+            JSONUtils.INSTANCE.toJSONObject(data);
+          } catch (org.json.simple.parser.ParseException e) {
+            throw new IllegalArgumentException(String.format("POST data '%s' 
must be properly formatted JSON.  " +
+                    "Set the '%s' property to false to disable this check.", 
data, RestConfig.ENFORCE_JSON));
+          }
+        }
       }
+      return data;
+    }
+  }
 
-      proxy.ifPresent(requestConfigBuilder::setProxy);
-      return requestConfigBuilder.build();
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("Expected at least %d 
argument(s), found %d", index+1, args.size()));
     }
 
-    /**
-     * Builds the HttpClientContext object by setting the basic auth and/or 
proxy basic auth credentials when the
-     * necessary rest config settings are configured.  Passwords are stored in 
HDFS.
-     * @param restConfig
-     * @param target
-     * @param proxy
-     * @return
-     * @throws IOException
-     */
-    protected HttpClientContext getHttpClientContext(RestConfig restConfig, 
HttpHost target, Optional<HttpHost> proxy) throws IOException {
-      HttpClientContext httpClientContext = HttpClientContext.create();
-      boolean credentialsAdded = false;
-      CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-
-      // Add the basic auth credentials if the rest config settings are present
-      if (restConfig.getBasicAuthUser() != null && 
restConfig.getBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new 
Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(target),
-                new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), 
password));
-        credentialsAdded = true;
-      }
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Retrieves the ClosableHttpClient from a pooling connection manager.
+   *
+   * @param context The execution context.
+   * @return A ClosableHttpClient.
+   */
+  protected static CloseableHttpClient getHttpClient(Context context) {
+    RestConfig restConfig = buildRestConfig(getGlobalConfig(context));
+
+    PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
+
+    return HttpClients.custom()
+            .setConnectionManager(cm)
+            .build();
+  }
+
+  protected static PoolingHttpClientConnectionManager 
getConnectionManager(RestConfig restConfig) {
+    PoolingHttpClientConnectionManager cm = new 
PoolingHttpClientConnectionManager();
+    if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
+      cm.setMaxTotal(restConfig.getPoolingMaxTotal());
+    }
+    if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
+      cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
+    }
+    return cm;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Map<String, Object> getGlobalConfig(Context context) {
+    Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, 
false);
+    return globalCapability.map(o -> (Map<String, Object>) 
o).orElseGet(HashMap::new);
+  }
 
-      // Add the proxy basic auth credentials if the rest config settings are 
present
-      if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
-              restConfig.getProxyBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new 
Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(proxy.get()),
-                new 
UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
-        credentialsAdded = true;
+  /**
+   * Build the RestConfig by applying settings in order of precedence (last 
item in the input list has highest priority).
+   * Only settings specified in the rest config will override lower priority 
config settings.
+   * @param configs
+   * @return
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  protected static RestConfig buildRestConfig(Map<String, Object>... configs) {
+    RestConfig restConfig = new RestConfig();
+
+    // Add settings in order of precedence
+    for(Map<String, Object> config: configs) {
+      if (config != null) {
+        restConfig.putAll(config);
       }
-      if (credentialsAdded) {
-        httpClientContext.setCredentialsProvider(credentialsProvider);
+    }
+    return restConfig;
+  }
+
+  /**
+   * Builds a URI from the supplied URI string and adds query parameters.
+   * @param uriString
+   * @param queryParameters
+   * @return
+   * @throws URISyntaxException
+   */
+  private static URI getURI(String uriString, Map<String, Object> 
queryParameters) throws URISyntaxException {
+    URIBuilder uriBuilder = new URIBuilder(uriString);
+    if (queryParameters != null) {
+      for(Map.Entry<String, Object> entry: queryParameters.entrySet()) {
+        uriBuilder.setParameter(entry.getKey(), (String) entry.getValue());
       }
-      return httpClientContext;
     }
+    return uriBuilder.build();
+  }
+
+  /**
+   * Returns the proxy HttpHost object if the proxy rest config settings are 
set.
+   * @param restConfig
+   * @return
+   */
+  protected static Optional<HttpHost> getProxy(RestConfig restConfig) {
+    Optional<HttpHost> proxy = Optional.empty();
+    if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != 
null) {
+      proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), 
restConfig.getProxyPort(), "http"));
+    }
+    return proxy;
+  }
 
-    protected Optional<Object> parseResponse(RestConfig restConfig, HttpGet 
httpGet, HttpEntity httpEntity) throws IOException {
-      Optional<Object> parsedResponse = Optional.empty();
-      if (httpEntity != null) {
-        int actualContentLength = 0;
-        String json = EntityUtils.toString(httpEntity);
-        if (json != null && !json.isEmpty()) {
-          actualContentLength = json.length();
-          parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, 
JSONUtils.MAP_SUPPLIER));
-        }
-        if (restConfig.verifyContentLength() && actualContentLength != 
httpEntity.getContentLength()) {
-          throw new IOException(String.format("Stellar REST request to %s 
returned incorrect or missing content length. " +
-                          "Content length in the response was %d but the 
actual body content length was %d.",
-                  httpGet.getURI().toString(),
-                  httpEntity.getContentLength(),
-                  actualContentLength));
-        }
+  /**
+   * Builds the RequestConfig object by setting HttpClient settings defined in 
the rest config.
+   * @param restConfig
+   * @param proxy
+   * @return
+   */
+  protected static RequestConfig getRequestConfig(RestConfig restConfig, 
Optional<HttpHost> proxy) {
+    RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+    if (restConfig.getConnectTimeout() != null) {
+      requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
+    }
+    if (restConfig.getConnectionRequestTimeout() != null) {
+      
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+    }
+    if (restConfig.getSocketTimeout() != null) {
+      requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+    }
+
+    proxy.ifPresent(requestConfigBuilder::setProxy);
+    return requestConfigBuilder.build();
+  }
+
+  /**
+   * Builds the HttpClientContext object by setting the basic auth and/or 
proxy basic auth credentials when the
+   * necessary rest config settings are configured.  Passwords are stored in 
HDFS.
+   * @param restConfig
+   * @param target
+   * @param proxy
+   * @return
+   * @throws IOException
+   */
+  protected static HttpClientContext getHttpClientContext(RestConfig 
restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
+    HttpClientContext httpClientContext = HttpClientContext.create();
+    boolean credentialsAdded = false;
+    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+    // Add the basic auth credentials if the rest config settings are present
+    if (restConfig.getBasicAuthUser() != null && 
restConfig.getBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new 
Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(target),
+              new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), 
password));
+      credentialsAdded = true;
+    }
+
+    // Add the proxy basic auth credentials if the rest config settings are 
present
+    if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
+            restConfig.getProxyBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new 
Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(proxy.get()),
+              new 
UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
+      credentialsAdded = true;
+    }
+    if (credentialsAdded) {
+      httpClientContext.setCredentialsProvider(credentialsProvider);
+    }
+    return httpClientContext;
+  }
+
+  /**
+   * Read bytes from a HDFS path.
+   * @param inPath
+   * @return
+   * @throws IOException
+   */
+  private static byte[] readBytes(Path inPath) throws IOException {
+    FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
+    try (FSDataInputStream inputStream = fs.open(inPath)) {
+      return IOUtils.toByteArray(inputStream);
+    }
+  }
+
+  /**
+   * Perform the HttpClient request and handle the results.  A configurable 
list of status codes are accepted and the
+   * response content (expected to be json) is parsed into a Map.  Values 
returned on errors and when response content
+   * is also configurable.  The rest config "timeout" setting is imposed in 
this method and will abort the get request
+   * if exceeded.
+   *
+   * @param restConfig
+   * @param httpRequestBase
+   * @return
+   * @throws IOException
+   */
+  protected static Object executeRequest(RestConfig restConfig, 
HttpRequestBase httpRequestBase) throws IOException {
+    URI uri = httpRequestBase.getURI();
+    HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), 
uri.getScheme());
+    Optional<HttpHost> proxy = getProxy(restConfig);
+    HttpClientContext httpClientContext = getHttpClientContext(restConfig, 
target, proxy);
+    httpRequestBase.setConfig(getRequestConfig(restConfig, proxy));
+
+    // Schedule a command to abort the request if the timeout is exceeded
+    ScheduledFuture scheduledFuture = 
scheduledExecutorService.schedule(httpRequestBase::abort, 
restConfig.getTimeout(), TimeUnit.MILLISECONDS);
+    CloseableHttpResponse response;
+    try {
+      response = closeableHttpClient.execute(httpRequestBase, 
httpClientContext);
+    } catch(Exception e) {
+      // Report a timeout if the httpGet request was aborted.  Otherwise 
rethrow exception.
+      if (httpRequestBase.isAborted()) {
+        throw new IOException(String.format("Total Stellar REST request time 
to %s exceeded the configured timeout of %d ms.", 
httpRequestBase.getURI().toString(), restConfig.getTimeout()));
+      } else {
+        throw e;
       }
-      return parsedResponse;
     }
 
-    /**
-     * Read bytes from a HDFS path.
-     * @param inPath
-     * @return
-     * @throws IOException
-     */
-    private byte[] readBytes(Path inPath) throws IOException {
-      FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
-      try (FSDataInputStream inputStream = fs.open(inPath)) {
-        return IOUtils.toByteArray(inputStream);
+    // Cancel the future if the request finished within the timeout
+    if (!scheduledFuture.isDone()) {
+      scheduledFuture.cancel(true);
+    }
+    int statusCode = response.getStatusLine().getStatusCode();
+    LOG.debug("request = {}; response = {}", httpRequestBase, response);
+    if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
+      HttpEntity httpEntity = response.getEntity();
+
+      // Parse the response if present, return the empty value override if not
+      Optional<Object> parsedResponse = parseResponse(restConfig, 
httpRequestBase, httpEntity);
+      return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
+    } else {
+      throw new IOException(String.format("Stellar REST request to %s expected 
status code to be one of %s but " +
+                      "failed with http status code %d: %s",
+              httpRequestBase.getURI().toString(),
+              restConfig.getResponseCodesAllowed().toString(),
+              statusCode,
+              EntityUtils.toString(response.getEntity())));
+    }
+  }
+
+  /**
+   * Parses the Http response into a Map and checks for content length.
+   * @param restConfig
+   * @param httpUriRequest
+   * @param httpEntity
+   * @return
+   * @throws IOException
+   */
+  protected static Optional<Object> parseResponse(RestConfig restConfig, 
HttpUriRequest httpUriRequest, HttpEntity httpEntity) throws IOException {
+    Optional<Object> parsedResponse = Optional.empty();
+    if (httpEntity != null) {
+      int actualContentLength = 0;
+      String json = EntityUtils.toString(httpEntity);
+      if (json != null && !json.isEmpty()) {
+        actualContentLength = json.length();
+        parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, 
JSONUtils.MAP_SUPPLIER));
+      }
+      if (restConfig.verifyContentLength() && actualContentLength != 
httpEntity.getContentLength()) {
+        throw new IOException(String.format("Stellar REST request to %s 
returned incorrect or missing content length. " +
+                        "Content length in the response was %d but the actual 
body content length was %d.",
+                httpUriRequest.getURI().toString(),
+                httpEntity.getContentLength(),
+                actualContentLength));
       }
     }
+    return parsedResponse;
+  }
+
+  /**
+   * Only used for testing.
+   * @param httpClient
+   */
+  protected static void setCloseableHttpClient(CloseableHttpClient httpClient) 
{
+    closeableHttpClient = httpClient;
+  }
+
+  /**
+   * Only used for testing.
+   * @param executorService
+   */
+  protected static void setScheduledExecutorService(ScheduledExecutorService 
executorService) {
+    scheduledExecutorService = executorService;
   }
 }
diff --git 
a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java
 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java
new file mode 100644
index 0000000..865453e
--- /dev/null
+++ 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.stellar.dsl.functions;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.junit.MockServerRule;
+import org.mockserver.junit.ProxyRule;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class RestFunctionsIntegrationTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Rule
+  public MockServerRule mockServerRule = new MockServerRule(this);
+
+  @Rule
+  public ProxyRule proxyRule = new ProxyRule(1080, this);
+
+  private MockServerClient mockServerClient;
+  private String baseUri;
+  private String getUri;
+  private String emptyGetUri;
+  private String postUri;
+  private String emptyPostUri;
+  private Context context;
+
+  private File basicAuthPasswordFile;
+  private String basicAuthPassword = "password";
+  private File proxyBasicAuthPasswordFile;
+  private String proxyAuthPassword = "proxyPassword";
+
+  @Before
+  public void setup() throws Exception {
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+            .build();
+
+    // Store the passwords in the local file system
+    basicAuthPasswordFile = tempDir.newFile("basicAuth.txt");
+    FileUtils.writeStringToFile(basicAuthPasswordFile, basicAuthPassword, 
StandardCharsets.UTF_8);
+    proxyBasicAuthPasswordFile = tempDir.newFile("proxyBasicAuth.txt");
+    FileUtils.writeStringToFile(proxyBasicAuthPasswordFile, proxyAuthPassword, 
StandardCharsets.UTF_8);
+
+    // By default, the mock server expects a GET request with the path set to 
/get
+    baseUri = String.format("http://localhost:%d";, mockServerRule.getPort());
+    getUri = baseUri + "/get";
+    emptyGetUri = baseUri + "/get/empty";
+    postUri = baseUri + "/post";
+    emptyPostUri = baseUri + "/post/empty";
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get/empty"))
+            .respond(response()
+                    .withStatusCode(404));
+
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post")
+                    .withBody("{\"key\":\"value\"}"))
+            .respond(response()
+                    .withBody("{\"post\":\"success\"}"));
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post/empty"))
+            .respond(response()
+                    .withStatusCode(404));
+  }
+
+  /**
+   * The REST_GET function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceed() throws Exception {
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("get"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceedWithQueryParameters() throws Exception {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get/with/query/parameters")
+                    .withQueryStringParameter("key", "value"))
+            .respond(response()
+                    .withBody("{\"get.with.query.parameters\":\"success\"}"));
+
+    Map<String, Object> variables = ImmutableMap.of("queryParameters", 
ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s', {}, queryParameters)",
+            baseUri + "/get/with/query/parameters"), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("get.with.query.parameters"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request using a proxy and 
parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceedWithProxy() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"proxyGet\":\"success\"}"));
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> new 
HashMap<String, Object>() {{
+      put(PROXY_HOST, "localhost");
+      put(PROXY_PORT, proxyRule.getHttpPort());
+    }});
+
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("proxyGet"));
+  }
+
+  /**
+   * The REST_GET function should handle an error status code and return null 
by default.
+   */
+  @Test
+  public void restGetShouldHandleErrorStatusCode() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withStatusCode(403));
+
+    assertNull(run(String.format("REST_GET('%s')", getUri), context));
+  }
+
+  /**
+   * {
+   *   "response.codes.allowed": [200,404],
+   *   "empty.content.override": "function config override"
+   * }
+   */
+  @Multiline
+  private String emptyContentOverride;
+
+  /**
+   * The REST_GET function should return the empty content override setting 
when status is allowed and content is empty.
+   */
+  @Test
+  public void restGetShouldReturnEmptyContentOverride() {
+    assertEquals("function config override", run(String.format("REST_GET('%s', 
%s)", emptyGetUri, emptyContentOverride), context));
+  }
+
+  /**
+   * {
+   *   "error.value.override": "error message"
+   * }
+   */
+  @Multiline
+  private String errorValueOverride;
+
+  /**
+   * The REST_GET function should return the error value override setting on 
error.
+   */
+  @Test
+  public void restGetShouldReturnErrorValueOverride() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withStatusCode(500));
+
+    Object result = run(String.format("REST_GET('%s', %s)", getUri, 
errorValueOverride), context);
+    assertEquals("error message" , result);
+  }
+
+  /**
+   * The REST_GET function should timeout and return null.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldTimeout() {
+    String uri = String.format("http://localhost:%d/get";, 
mockServerRule.getPort());
+
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(TIMEOUT, 1);
+      }});
+    }};
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", uri), context);
+    assertNull(actual);
+  }
+
+  /**
+   * {
+   * "timeout": 1
+   * }
+   */
+  @Multiline
+  private String timeoutConfig;
+
+  /**
+   * The REST_GET function should honor the function supplied timeout setting.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldTimeoutWithSuppliedTimeout() {
+    String expression = String.format("REST_GET('%s', %s)", getUri, 
timeoutConfig);
+    Map<String, Object> actual = (Map<String, Object>) run(expression, 
context);
+    assertNull(actual);
+  }
+
+  /**
+   * The REST_GET function should throw an exception on a malformed uri.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restGetShouldHandleURISyntaxException() throws 
IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET('some invalid uri'): Unable 
to parse: REST_GET('some invalid uri') due to: Illegal character in path at 
index 4: some invalid uri");
+
+    run("REST_GET('some invalid uri')", context);
+  }
+
+
+
+  /**
+   * The REST_GET function should throw an exception when the required uri 
parameter is missing.
+   */
+  @Test
+  public void restGetShouldThrownExceptionOnMissingParameter() {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET(): Unable to parse: 
REST_GET() due to: Expected at least 1 argument(s), found 0");
+
+    run("REST_GET()", context);
+  }
+
+  /**
+   * Global config Stellar REST settings should take precedence over defaults 
in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseGlobalConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("global config override", run(String.format("REST_GET('%s')", 
emptyGetUri), context));
+  }
+
+  /**
+   * Global config Stellar REST GET settings should take precedence over 
general Stellar REST settings in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseGetConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_GET_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "get config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("get config override", run(String.format("REST_GET('%s')", 
emptyGetUri), context));
+  }
+
+  /**
+   * Settings passed into the function should take precedence over all other 
settings in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseFunctionConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_GET_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "get config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("function config override", run(String.format("REST_GET('%s', 
%s)", emptyGetUri, emptyContentOverride), context));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceed() throws Exception {
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_POST('%s', '{\"key\":\"value\"}')", postUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post"));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceedWithQueryParameters() throws Exception {
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post/with/query/parameters")
+                    .withQueryStringParameter("key", "value"))
+            .respond(response()
+                    .withBody("{\"post.with.query.parameters\":\"success\"}"));
+
+    Map<String, Object> variables = ImmutableMap.of("queryParameters", 
ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_POST('%s', {}, {}, queryParameters)",
+            baseUri + "/post/with/query/parameters"), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post.with.query.parameters"));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceedWithStellarMap() throws Exception {
+    Map<String, Object> variables = ImmutableMap.of("body", 
ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_POST('%s', body)", postUri), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post"));
+  }
+
+  /**
+   * The REST_POST function should throw an exception on a malformed uri.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restPostShouldHandleURISyntaxException() throws 
IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_POST('some invalid uri', {}): 
Unable to parse: REST_POST('some invalid uri', {}) due to: Illegal character in 
path at index 4: some invalid uri");
+
+    run("REST_POST('some invalid uri', {})", context);
+  }
+
+  /**
+   * The REST_POST function should throw an exception when POST data is not 
well-formed JSON and 'enforce.json' is set to true.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restPostShouldThrowExceptionOnMalformedJson() throws 
IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage(String.format("Unable to parse: REST_POST('%s', 
'malformed json') due to: POST data 'malformed json' must be properly formatted 
JSON.  " +
+            "Set the 'enforce.json' property to false to disable this check.", 
postUri));
+
+    run(String.format("REST_POST('%s', 'malformed json')", postUri), context);
+  }
+
+  /**
+   * Global config Stellar REST settings should take precedence over defaults 
in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseGlobalConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("global config override", run(String.format("REST_POST('%s', 
{})", emptyGetUri), context));
+  }
+
+  /**
+   * Global config Stellar REST POST settings should take precedence over 
general Stellar REST settings in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseGetConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_POST_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "post config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("post config override", run(String.format("REST_POST('%s', 
{})", emptyGetUri), context));
+  }
+
+  /**
+   * Settings passed into the function should take precedence over all other 
settings in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseFunctionConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_POST_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "post config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
+
+    assertEquals("function config override", 
run(String.format("REST_POST('%s', {}, %s)", emptyGetUri, 
emptyContentOverride), context));
+  }
+
+}
diff --git 
a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
index 2008a95..a746321 100644
--- 
a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
+++ 
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.stellar.dsl.functions;
 
-import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
@@ -26,60 +25,34 @@ import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.mockserver.client.server.MockServerClient;
-import org.mockserver.junit.MockServerRule;
-import org.mockserver.junit.ProxyRule;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
 
-import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_PASSWORD_PATH;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_USER;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.CONNECTION_REQUEST_TIMEOUT;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.CONNECT_TIMEOUT;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_PASSWORD_PATH;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_USER;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_HOST;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_PORT;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.SOCKET_TIMEOUT;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.TIMEOUT;
-import static 
org.apache.metron.stellar.dsl.functions.RestConfig.VERIFY_CONTENT_LENGTH;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.mockserver.model.HttpRequest.request;
-import static org.mockserver.model.HttpResponse.response;
+import static org.mockito.Mockito.*;
 
 /**
  * Tests the RestFunctions class.
@@ -90,18 +63,13 @@ public class RestFunctionsTest {
   public ExpectedException thrown = ExpectedException.none();
 
   @Rule
-  public MockServerRule mockServerRule = new MockServerRule(this);
+  public TemporaryFolder tempDir = new TemporaryFolder();
 
-  @Rule
-  public ProxyRule proxyRule = new ProxyRule(1080, this);
-
-  private MockServerClient mockServerClient;
-  private String getUri;
   private Context context;
 
-  private String basicAuthPasswordPath = "./target/basicAuth.txt";
+  private File basicAuthPasswordFile;
   private String basicAuthPassword = "password";
-  private String proxyBasicAuthPasswordPath = "./target/proxyBasicAuth.txt";
+  private File proxyBasicAuthPasswordFile;
   private String proxyAuthPassword = "proxyPassword";
 
   @Before
@@ -111,116 +79,10 @@ public class RestFunctionsTest {
             .build();
 
     // Store the passwords in the local file system
-    FileUtils.writeStringToFile(new File(basicAuthPasswordPath), 
basicAuthPassword, StandardCharsets.UTF_8);
-    FileUtils.writeStringToFile(new File(proxyBasicAuthPasswordPath), 
proxyAuthPassword, StandardCharsets.UTF_8);
-
-    // By default, the mock server expects a GET request with the path set to 
/get
-    getUri = String.format("http://localhost:%d/get";, 
mockServerRule.getPort());
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"get\":\"success\"}"));
-  }
-
-  /**
-   * The REST_GET function should perform a get request and parse the results.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldSucceed() throws Exception {
-    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
-
-    assertEquals(1, actual.size());
-    assertEquals("success", actual.get("get"));
-  }
-
-  /**
-   * The REST_GET function should perform a get request using a proxy and 
parse the results.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldSucceedWithProxy() {
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"proxyGet\":\"success\"}"));
-
-    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> new 
HashMap<String, Object>() {{
-      put(PROXY_HOST, "localhost");
-      put(PROXY_PORT, proxyRule.getHttpPort());
-    }});
-
-    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", getUri), context);
-
-    assertEquals(1, actual.size());
-    assertEquals("success", actual.get("proxyGet"));
-  }
-
-  /**
-   * The REST_GET function should handle an error status code and return null 
by default.
-   */
-  @Test
-  public void restGetShouldHandleErrorStatusCode() {
-      mockServerClient.when(
-              request()
-                      .withMethod("GET")
-                      .withPath("/get"))
-              .respond(response()
-                      .withStatusCode(403));
-
-      assertNull(run(String.format("REST_GET('%s')", getUri), context));
-  }
-
-  /**
-   * {
-   *   "response.codes.allowed": [200,404],
-   *   "empty.content.override": {}
-   * }
-   */
-  @Multiline
-  private String emptyContentOverride;
-
-  /**
-   * The REST_GET function should return the empty content override setting 
when status is allowed and content is empty.
-   */
-  @Test
-  public void restGetShouldReturnEmptyContentOverride() {
-      mockServerClient.when(
-              request()
-                      .withMethod("GET")
-                      .withPath("/get"))
-              .respond(response()
-                      .withStatusCode(404));
-
-    assertEquals(new HashMap<>(), run(String.format("REST_GET('%s', %s)", 
getUri, emptyContentOverride), context));
-  }
-
-  /**
-   * {
-   *   "error.value.override": "error message"
-   * }
-   */
-  @Multiline
-  private String errorValueOverride;
-
-  /**
-   * The REST_GET function should return the error value override setting on 
error.
-   */
-  @Test
-  public void restGetShouldReturnErrorValueOverride() {
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withStatusCode(500));
-
-    Object result = run(String.format("REST_GET('%s', %s)", getUri, 
errorValueOverride), context);
-    assertEquals("error message" , result);
+    basicAuthPasswordFile = tempDir.newFile("basicAuth.txt");
+    FileUtils.writeStringToFile(basicAuthPasswordFile, basicAuthPassword, 
StandardCharsets.UTF_8);
+    proxyBasicAuthPasswordFile = tempDir.newFile("proxyBasicAuth.txt");
+    FileUtils.writeStringToFile(proxyBasicAuthPasswordFile, proxyAuthPassword, 
StandardCharsets.UTF_8);
   }
 
   /**
@@ -228,11 +90,9 @@ public class RestFunctionsTest {
    */
   @Test
   public void restGetShouldGetProxy() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     {
       RestConfig restConfig = new RestConfig();
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
@@ -240,15 +100,15 @@ public class RestFunctionsTest {
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_HOST, "localhost");
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
 
     {
       RestConfig restConfig = new RestConfig();
-      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      restConfig.put(PROXY_PORT, 3128);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
@@ -256,125 +116,36 @@ public class RestFunctionsTest {
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_HOST, "localhost");
-      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      restConfig.put(PROXY_PORT, 3128);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
-      assertEquals(new HttpHost("localhost", proxyRule.getHttpPort()), 
actual.get());
+      assertEquals(new HttpHost("localhost", 3128), actual.get());
     }
   }
 
   /**
-   * The REST_GET function should return settings in the correct order of 
precedence.
+   * RestConfig should be built with settings in the correct order of 
precedence.
    * @throws Exception
    */
   @Test
-  public void restGetShouldGetRestConfig() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
-    {
-      // Test for default timeout
-      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
-
-      assertEquals(3, restConfig.size());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertNull(restConfig.getBasicAuthUser());
-    }
-
-    Map<String, Object> globalRestConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(SOCKET_TIMEOUT, 2000);
-        put(BASIC_AUTH_USER, "globalUser");
-        put(PROXY_HOST, "globalHost");
-      }});
+  public void restShouldBuildRestConfig() throws Exception {
+    Map<String, Object> config = new HashMap<String, Object>() {{
+      put(BASIC_AUTH_USER, "user");
+      put(PROXY_BASIC_AUTH_USER, "proxyUser");
     }};
 
-    // Global config settings should take effect
-    {
-      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("globalUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    Map<String, Object> functionRestConfig = new HashMap<String, Object>() {{
-      put(SOCKET_TIMEOUT, 1);
-      put(BASIC_AUTH_USER, "functionUser");
-      put(TIMEOUT, 100);
+    Map<String, Object> priorityConfig = new HashMap<String, Object>() {{
+      put(BASIC_AUTH_USER, "priorityUser");
     }};
 
-
-    // Function call settings should override global settings
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(1, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    functionRestConfig = new HashMap<String, Object>() {{
-      put(BASIC_AUTH_USER, "functionUser");
-      put(TIMEOUT, 100);
-    }};
-
-    // New function call settings should take effect with global settings 
staying the same
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    globalRestConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(SOCKET_TIMEOUT, 2000);
-        put(BASIC_AUTH_USER, "globalUser");
-      }});
-    }};
-
-    // New global settings should take effect with function call settings 
staying the same
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", 
functionRestConfig), globalRestConfig);
-
-      assertEquals(5, restConfig.size());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-    }
-
-    // Should fall back to global settings on missing function call config
-    {
-      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
-
-      assertEquals(5, restConfig.size());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("globalUser", restConfig.getBasicAuthUser());
-    }
-
-    // Should fall back to default settings on missing global settings
-    {
-      RestConfig restConfig = 
restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
-
-      assertEquals(3, restConfig.size());
-      assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-    }
-
+    RestConfig restConfig = RestFunctions.buildRestConfig(config, 
priorityConfig);
+    assertEquals(6, restConfig.size());
+    assertEquals(Collections.singletonList(200), 
restConfig.getResponseCodesAllowed());
+    assertEquals("priorityUser", restConfig.getBasicAuthUser());
+    assertEquals("proxyUser", restConfig.getProxyBasicAuthUser());
+    assertTrue(restConfig.enforceJson());
+    assertEquals(1000, restConfig.getTimeout().intValue());
+    assertFalse(restConfig.verifyContentLength());
   }
 
   /**
@@ -382,10 +153,8 @@ public class RestFunctionsTest {
    */
   @Test
   public void restGetShouldGetRequestConfig() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     {
-      RequestConfig actual = restGet.getRequestConfig(new RestConfig(), 
Optional.empty());
+      RequestConfig actual = RestFunctions.getRequestConfig(new RestConfig(), 
Optional.empty());
       RequestConfig expected = RequestConfig.custom().build();
 
       assertEquals(expected.getConnectTimeout(), actual.getConnectTimeout());
@@ -399,10 +168,10 @@ public class RestFunctionsTest {
       restConfig.put(CONNECT_TIMEOUT, 1);
       restConfig.put(CONNECTION_REQUEST_TIMEOUT, 2);
       restConfig.put(SOCKET_TIMEOUT, 3);
-      HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+      HttpHost proxy = new HttpHost("localhost", 3128);
       Optional<HttpHost> proxyOptional = Optional.of(proxy);
 
-      RequestConfig actual = restGet.getRequestConfig(restConfig, 
proxyOptional);
+      RequestConfig actual = RestFunctions.getRequestConfig(restConfig, 
proxyOptional);
       RequestConfig expected = RequestConfig.custom()
               .setConnectTimeout(1)
               .setConnectionRequestTimeout(2)
@@ -424,13 +193,12 @@ public class RestFunctionsTest {
    */
   @Test
   public void restGetShouldGetHttpClientContext() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-    HttpHost target = new HttpHost("localhost", mockServerRule.getPort());
-    HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+    HttpHost target = new HttpHost("localhost", 8080);
+    HttpHost proxy = new HttpHost("localhost", 3128);
 
     {
       RestConfig restConfig = new RestConfig();
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.empty());
+      HttpClientContext actual = 
RestFunctions.getHttpClientContext(restConfig, target, Optional.empty());
 
       assertNull(actual.getCredentialsProvider());
     }
@@ -438,9 +206,9 @@ public class RestFunctionsTest {
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(BASIC_AUTH_USER, "user");
-      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, 
basicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.empty());
+      HttpClientContext actual = 
RestFunctions.getHttpClientContext(restConfig, target, Optional.empty());
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -457,9 +225,9 @@ public class RestFunctionsTest {
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
-      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordPath);
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.of(proxy));
+      HttpClientContext actual = 
RestFunctions.getHttpClientContext(restConfig, target, Optional.of(proxy));
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -476,11 +244,11 @@ public class RestFunctionsTest {
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(BASIC_AUTH_USER, "user");
-      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, 
basicAuthPasswordFile.getAbsolutePath());
       restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
-      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordPath);
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, 
proxyBasicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, 
target, Optional.of(proxy));
+      HttpClientContext actual = 
RestFunctions.getHttpClientContext(restConfig, target, Optional.of(proxy));
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new 
BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -499,161 +267,109 @@ public class RestFunctionsTest {
   }
 
   /**
-   * The REST_GET function should timeout and return null.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldTimeout() {
-    String uri = String.format("http://localhost:%d/get";, 
mockServerRule.getPort());
-
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"get\":\"success\"}"));
-
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(TIMEOUT, 1);
-      }});
-    }};
-
-    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> 
globalConfig);
-
-    Map<String, Object> actual = (Map<String, Object>) 
run(String.format("REST_GET('%s')", uri), context);
-    assertNull(actual);
-  }
-
-  /**
-   * {
-   * "timeout": 1
-   * }
-   */
-  @Multiline
-  private String timeoutConfig;
-
-  /**
-   * The REST_GET function should honor the function supplied timeout setting.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldTimeoutWithSuppliedTimeout() {
-    String expression = String.format("REST_GET('%s', %s)", getUri, 
timeoutConfig);
-    Map<String, Object> actual = (Map<String, Object>) run(expression, 
context);
-    assertNull(actual);
-  }
-
-  /**
-   * The REST_GET function should throw an exception on a malformed uri.
-   * @throws IllegalArgumentException
-   * @throws IOException
-   */
-  @Test
-  public void restGetShouldHandleURISyntaxException() throws 
IllegalArgumentException, IOException {
-    thrown.expect(ParseException.class);
-    thrown.expectMessage("Unable to parse REST_GET('some invalid uri'): Unable 
to parse: REST_GET('some invalid uri') due to: Illegal character in path at 
index 4: some invalid uri");
-
-    run("REST_GET('some invalid uri')", context);
-  }
-
-  /**
    * The REST_GET function should handle IOExceptions and return null.
    * @throws IllegalArgumentException
    * @throws IOException
    */
   @Test
   public void restGetShouldHandleIOException() throws 
IllegalArgumentException, IOException {
-    RestFunctions.RestGet restGet = spy(RestFunctions.RestGet.class);
-    doThrow(new IOException("io 
exception")).when(restGet).doGet(any(RestConfig.class), any(HttpGet.class), 
any(HttpClientContext.class));
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+    CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = 
mock(ScheduledExecutorService.class);
 
-    Object result = restGet.apply(Collections.singletonList(getUri), context);
-    Assert.assertNull(result);
-  }
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
 
-  /**
-   * The REST_GET function should throw an exception when the required uri 
parameter is missing.
-   */
-  @Test
-  public void restGetShouldThrownExceptionOnMissingParameter() {
-    thrown.expect(ParseException.class);
-    thrown.expectMessage("Unable to parse REST_GET(): Unable to parse: 
REST_GET() due to: Expected at least 1 argument(s), found 0");
+    when(httpClient.execute(any(HttpRequestBase.class), 
any(HttpClientContext.class))).thenThrow(new IOException("io exception"));
 
-    run("REST_GET()", context);
+    Object result = 
restGet.apply(Collections.singletonList("http://www.host.com:8080/some/uri";), 
context);
+    Assert.assertNull(result);
   }
 
   @Test
   public void restGetShouldGetPoolingConnectionManager() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     RestConfig restConfig = new RestConfig();
     restConfig.put(POOLING_MAX_TOTAL, 5);
     restConfig.put(POOLING_DEFAULT_MAX_PER_RUOTE, 2);
 
-    PoolingHttpClientConnectionManager cm = 
restGet.getConnectionManager(restConfig);
+    PoolingHttpClientConnectionManager cm = 
RestFunctions.getConnectionManager(restConfig);
 
     assertEquals(5, cm.getMaxTotal());
     assertEquals(2, cm.getDefaultMaxPerRoute());
   }
 
   @Test
-  public void restGetShouldCloseHttpClient() throws Exception {
+  public void restGetShouldClose() throws Exception {
     RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = 
mock(ScheduledExecutorService.class);
 
-    restGet.setHttpClient(httpClient);
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
     restGet.close();
 
     verify(httpClient, times(1)).close();
+    verify(executorService, times(1)).shutdown();
+    verifyNoMoreInteractions(httpClient);
+  }
+
+  @Test
+  public void restPostShouldClose() throws Exception {
+    RestFunctions.RestPost restPost = new RestFunctions.RestPost();
+    CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = 
mock(ScheduledExecutorService.class);
+
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
+    restPost.close();
+
+    verify(httpClient, times(1)).close();
+    verify(executorService, times(1)).shutdown();
     verifyNoMoreInteractions(httpClient);
   }
 
   @Test
   public void restGetShouldParseResponse() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return successfully parsed response
     when(httpEntity.getContent()).thenReturn(new 
ByteArrayInputStream("{\"get\":\"success\"}".getBytes()));
-    Optional<Object> actual = restGet.parseResponse(restConfig, httpGet, 
httpEntity);
+    Optional<Object> actual = RestFunctions.parseResponse(restConfig, httpGet, 
httpEntity);
     assertTrue(actual.isPresent());
     assertEquals("success", ((Map<String, Object>) actual.get()).get("get"));
   }
 
   @Test
   public void restGetShouldParseResponseOnNullHttpEntity() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
 
     // return empty on null httpEntity
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, 
null));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, 
httpGet, null));
   }
 
   @Test
   public void restGetShouldParseResponseOnNullContent() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return empty on null content
     when(httpEntity.getContent()).thenReturn(null);
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, 
httpEntity));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, 
httpGet, httpEntity));
   }
 
   @Test
   public void restGetShouldParseResponseOnEmptyInputStream() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return empty on empty input stream
     when(httpEntity.getContent()).thenReturn(new 
ByteArrayInputStream("".getBytes()));
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, 
httpEntity));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, 
httpGet, httpEntity));
   }
 
   @Test
@@ -670,7 +386,6 @@ public class RestFunctionsTest {
     when(httpGet.getURI()).thenReturn(new URI("uri"));
     when(httpEntity.getContent()).thenReturn(new 
ByteArrayInputStream("{\"get\":\"success\"}".getBytes()));
     when(httpEntity.getContentLength()).thenReturn(-1L);
-    restGet.parseResponse(restConfig, httpGet, httpEntity);
+    RestFunctions.parseResponse(restConfig, httpGet, httpEntity);
   }
-
 }

Reply via email to