[ https://issues.apache.org/jira/browse/NIFI-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505959#comment-16505959 ]
ASF GitHub Bot commented on NIFI-5214: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2723#discussion_r194039043 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java --- @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import com.burgstaller.okhttp.AuthenticationCacheInterceptor; +import com.burgstaller.okhttp.CachingAuthenticatorDecorator; +import com.burgstaller.okhttp.digest.CachingAuthenticator; +import com.burgstaller.okhttp.digest.DigestAuthenticator; +import okhttp3.Credentials; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.nifi.annotation.behavior.DynamicProperties; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; +import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.net.Proxy; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + +@Tags({ "rest", "lookup", "json", "xml", "http" }) +@CapabilityDescription("Use a REST service to enrich records.") +@DynamicProperties({ + @DynamicProperty(name = "*", value = "*", description = "All dynamic properties are added as HTTP headers with the name " + + "as the header name and the value as the header value.") +}) +public class RestLookupService extends AbstractControllerService implements LookupService<Record> { + static final PropertyDescriptor URL = new PropertyDescriptor.Builder() + .name("rest-lookup-url") + .displayName("URL") + .description("The URL for the REST endpoint. Expression language is evaluated against the lookup key/value pairs, " + + "not flowfile attributes or variable registry.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("rest-lookup-record-reader") + .displayName("Record Reader") + .description("The record reader to use for loading the payload and handling it as a record set.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_PATH = new PropertyDescriptor.Builder() + .name("rest-lookup-record-path") + .displayName("Record Path") + .description("An optional record path that can be used to define where in a record to get the real data to merge " + + "into the record set to be enriched. See documentation for examples of when this might be useful.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(new RecordPathValidator()) + .required(false) + .build(); + + static final PropertyDescriptor RECORD_PATH_PROPERTY_NAME = new PropertyDescriptor.Builder() + .name("rest-lookup-record-path-name") + .displayName("Record Path Property Name") + .description("An optional name for the property loaded by the record path. This will be the key used for the value " + + "when the loaded record is merged into record to be enriched. See docs for more details.") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(Validator.VALID) + .required(false) + .build(); + + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("rest-lookup-ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() + .name("rest-lookup-basic-auth-username") + .displayName("Basic Authentication Username") + .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") + .required(false) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) + .build(); + + public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() + .name("rest-lookup-basic-auth-password") + .displayName("Basic Authentication Password") + .description("The password to be used by the client to authenticate against the Remote URL.") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) + .build(); + public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder() + .name("rest-lookup-digest-auth") + .displayName("Use Digest Authentication") + .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used " + + "for authentication.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS}; + public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE + = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); + + static final String MIME_TYPE_KEY = "mime.type"; + static final String BODY_KEY = "request.body"; + static final String METHOD_KEY = "request.method"; + + static final List<PropertyDescriptor> DESCRIPTORS; + static final Set<String> KEYS; + + static { + DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + URL, + RECORD_READER, + RECORD_PATH, + RECORD_PATH_PROPERTY_NAME, + SSL_CONTEXT_SERVICE, + PROXY_CONFIGURATION_SERVICE, + PROP_BASIC_AUTH_USERNAME, + PROP_BASIC_AUTH_PASSWORD, + PROP_DIGEST_AUTH + )); + Set<String> _keys = new HashSet<>(); + _keys.add(MIME_TYPE_KEY); + _keys.add(METHOD_KEY); + KEYS = Collections.unmodifiableSet(_keys); + } + + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + private ProxyConfigurationService proxyConfigurationService; + private RecordReaderFactory readerFactory; + private RecordPath recordPath; + private OkHttpClient client; + private Map<String, String> headers; + private volatile PreparedQuery compiledQuery; + private volatile String recordPathName; + private volatile String basicUser; + private volatile String basicPass; + private volatile boolean isDigest; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE) + .asControllerService(ProxyConfigurationService.class); + + OkHttpClient.Builder builder = new OkHttpClient.Builder(); + + setAuthenticator(builder, context); + + if (proxyConfigurationService != null) { + setProxy(builder); + } + + final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.WANT); + if (sslService != null) { + builder.sslSocketFactory(sslContext.getSocketFactory()); + } + + client = builder.build(); + + String path = context.getProperty(RECORD_PATH).isSet() ? context.getProperty(RECORD_PATH).getValue() : null; + if (!StringUtils.isBlank(path)) { + recordPath = RecordPath.compile(path); + } + + recordPathName = context.getProperty(RECORD_PATH_PROPERTY_NAME).isSet() + ? context.getProperty(RECORD_PATH_PROPERTY_NAME).evaluateAttributeExpressions().getValue() + : null; + + getHeaders(context); + + compiledQuery = Query.prepare(context.getProperty(URL).getValue()); + } + + @OnDisabled + public void onDisabled() { + this.compiledQuery = null; + } + + private void getHeaders(ConfigurationContext context) { + headers = new HashMap<>(); + for (PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + headers.put( + descriptor.getDisplayName(), + context.getProperty(descriptor).evaluateAttributeExpressions().getValue() + ); + } + } + } + + private void setProxy(OkHttpClient.Builder builder) { + ProxyConfiguration config = proxyConfigurationService.getConfiguration(); + if (!config.getProxyType().equals(Proxy.Type.DIRECT)) { + final Proxy proxy = config.createProxy(); + builder.proxy(proxy); + + if (config.hasCredential()){ + builder.proxyAuthenticator((route, response) -> { + final String credential= Credentials.basic(config.getProxyUserName(), config.getProxyUserPassword()); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + }); + } + } + } + + @Override + public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException { + final String endpoint = determineEndpoint(coordinates); + final String mimeType = (String)coordinates.get(MIME_TYPE_KEY); + final String method = (String)coordinates.get(METHOD_KEY); --- End diff -- A value is required because it's a required key, so I don't think that's necessary. > Add a REST lookup service > ------------------------- > > Key: NIFI-5214 > URL: https://issues.apache.org/jira/browse/NIFI-5214 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Mike Thomsen > Assignee: Mike Thomsen > Priority: Major > > * Should have reader API support > * Should be able to drill down through complex XML and JSON responses to a > nested record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)