[ https://issues.apache.org/jira/browse/METRON-1850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673078#comment-16673078 ]
ASF GitHub Bot commented on METRON-1850: ---------------------------------------- Github user merrimanr commented on a diff in the pull request: https://github.com/apache/metron/pull/1250#discussion_r230366857 --- Diff: metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java --- @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.dsl.functions; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.common.utils.JSONUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG; +import static org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS; + +/** + * Defines functions that enable REST requests with proper result and error handling. Depends on an + * Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT capability. Exposes various Http settings + * including authentication, proxy and timeouts through the global config with the option to override any settings + * through a config object supplied in the expression. + */ +public class RestFunctions { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Retrieves the ClosableHttpClient from the execution context. + * + * @param context The execution context. + * @return A ClosableHttpClient, if one exists. Otherwise, an exception is thrown. + */ + private static CloseableHttpClient getHttpClient(Context context) { + Optional<Object> clientOpt = context.getCapability(Context.Capabilities.HTTP_CLIENT); + if(clientOpt.isPresent()) { + return (CloseableHttpClient) clientOpt.get(); + } else { + throw new IllegalStateException("Missing HTTP_CLIENT; http connection required"); + } + } + + /** + * Get an argument from a list of arguments. + * + * @param index The index within the list of arguments. + * @param clazz The type expected. + * @param args All of the arguments. + * @param <T> The type of the argument expected. + */ + public static <T> T getArg(int index, Class<T> clazz, List<Object> args) { + + if(index >= args.size()) { + throw new IllegalArgumentException(format("Expected at least %d argument(s), found %d", index+1, args.size())); + } + + return ConversionUtils.convert(args.get(index), clazz); + } + + @Stellar( + namespace = "REST", + name = "GET", + description = "Performs a REST GET request and parses the JSON results into a map.", + params = { + "url - URI to the REST service", + "rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " + + "of the same name. Default is the empty Map, meaning no overrides." + }, + returns = "JSON results as a Map") + public static class RestGet implements StellarFunction { + + /** + * Whether the function has been initialized. + */ + private boolean initialized = false; + + /** + * The CloseableHttpClient. + */ + private CloseableHttpClient httpClient; + + /** + * Executor used to impose a hard request timeout. + */ + private ScheduledExecutorService scheduledExecutorService; + + /** + * Apply the function. + * @param args The function arguments including uri and rest config. + * @param context Stellar context + */ + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + RestConfig restConfig = new RestConfig(); + try { + URI uri = new URI(getArg(0, String.class, args)); + Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false); + + Map<String, Object> globalConfig = (Map<String, Object>) globalCapability.get(); + + restConfig = getRestConfig(args, globalConfig); + + HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme()); + Optional<HttpHost> proxy = getProxy(restConfig); + HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy); + + HttpGet httpGet = new HttpGet(uri); + httpGet.addHeader("Accept", "application/json"); + httpGet.setConfig(getRequestConfig(restConfig, proxy)); + + return doGet(restConfig, httpGet, httpClientContext); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return restConfig.getErrorValueOverride(); + } + } + + /** + * Perform the HttpClient get and handle the results. A configurable list of status codes are accepted and the + * response content (expected to be json) is parsed into a Map. Values returned on errors and when response content + * is also configurable. The rest config "timeout" setting is imposed in this method and will abort the get request + * if exceeded. + * + * @param restConfig + * @param httpGet + * @param httpClientContext + * @return + * @throws IOException + */ + private Object doGet(RestConfig restConfig, HttpGet httpGet, HttpClientContext httpClientContext) throws IOException { + + // Schedule a command to abort the httpGet request if the timeout is exceeded --- End diff -- Did you see the REST section in the Stellar README? I can add more content to make that clearer. > Stellar REST function > --------------------- > > Key: METRON-1850 > URL: https://issues.apache.org/jira/browse/METRON-1850 > Project: Metron > Issue Type: New Feature > Reporter: Ryan Merriman > Priority: Major > > It would be useful to be able to enrich messages with Stellar using 3rd party > (or internal) REST services. At a minimum this function would: > * Stellar function available to GET from an HTTP API > * Optional parameters for basic auth (user/password) which generate correct > Authorization header > * Function returns null value for errors, connection failures etc and logs > error > * Function must provide and use pooled connection objects at the process > level > * Function must send Accept: application/json header > * A global setting must be available to set a proxy for all API calls, and > if present the proxy must be used. > * Proxy authentication must also be supported using basic auth. -- This message was sent by Atlassian JIRA (v7.6.3#76005)