[ https://issues.apache.org/jira/browse/NIFI-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404210#comment-16404210 ]
ASF GitHub Bot commented on NIFI-4325: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/2113#discussion_r175306133 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { + private ObjectMapper mapper = new ObjectMapper(); + + static final private List<PropertyDescriptor> properties; + + private RestClient client; + + private String url; + + static { + List _props = new ArrayList(); + _props.add(ElasticSearchClientService.HTTP_HOSTS); + _props.add(ElasticSearchClientService.USERNAME); + _props.add(ElasticSearchClientService.PASSWORD); + _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); + _props.add(ElasticSearchClientService.CONNECT_TIMEOUT); + _props.add(ElasticSearchClientService.SOCKET_TIMEOUT); + _props.add(ElasticSearchClientService.RETRY_TIMEOUT); + + properties = Collections.unmodifiableList(_props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + try { + setupClient(context); + } catch (Exception ex) { + getLogger().error("Could not initialize ElasticSearch client.", ex); + throw new InitializationException(ex); + } + } + + @OnDisabled + public void onDisabled() throws IOException { + this.client.close(); + this.url = null; + } + + private void setupClient(ConfigurationContext context) throws MalformedURLException { + final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); + String[] hostsSplit = hosts.split(",[\\s]*"); + this.url = hostsSplit[0]; + final SSLContextService sslService = + context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + + final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger(); + final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger(); + final Integer retryTimeout = context.getProperty(RETRY_TIMEOUT).asInteger(); + + HttpHost[] hh = new HttpHost[hostsSplit.length]; + for (int x = 0; x < hh.length; x++) { + URL u = new URL(hostsSplit[x]); + hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol()); + } + + RestClientBuilder builder = RestClient.builder(hh) + .setHttpClientConfigCallback(httpClientBuilder -> { + if (sslService != null && sslService.isKeyStoreConfigured() && sslService.isTrustStoreConfigured()) { + try { + KeyStore keyStore = KeyStore.getInstance(sslService.getKeyStoreType()); + KeyStore trustStore = KeyStore.getInstance("JKS"); + + try (final InputStream is = new FileInputStream(sslService.getKeyStoreFile())) { + keyStore.load(is, sslService.getKeyStorePassword().toCharArray()); + } + + try (final InputStream is = new FileInputStream(sslService.getTrustStoreFile())) { + trustStore.load(is, sslService.getTrustStorePassword().toCharArray()); + } + + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); + kmf.init(keyStore, sslService.getKeyStorePassword().toCharArray()); + final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory + .getDefaultAlgorithm()); + tmf.init(keyStore); + SSLContext context1 = SSLContext.getInstance(sslService.getSslAlgorithm()); + context1.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + httpClientBuilder = httpClientBuilder.setSSLContext(context1); + } catch (Exception e) { + getLogger().error("Error setting up SSL.", e); + } + } + + if (username != null && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + return httpClientBuilder; + }) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(connectTimeout); + requestConfigBuilder.setSocketTimeout(readTimeout); + return requestConfigBuilder; + }) + .setMaxRetryTimeoutMillis(retryTimeout); + + this.client = builder.build(); + } + + private Response runQuery(String query, String index, String type) throws IOException { + StringBuilder sb = new StringBuilder() + .append("/") + .append(index); + if (type != null && !type.equals("")) { + sb.append("/") + .append(type); + } + + sb.append("/_search"); + + HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); + + return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity); + } + + @Override + public Optional<SearchResponse> search(String query, String index, String type) throws IOException { + Response response = runQuery(query, index, type); + Map<String, Object> parsed = mapper.readValue(IOUtils.toString(response.getEntity().getContent()), Map.class); --- End diff -- 2 things: 1. IOUtils.toString is deprecated, you should pass the expected charset to use. 2. In general, you should avoid doing a lot of nesting of logic into a single line. It helps a lot with readability and diagnosing errors. For example, if we hit an NPE here, it could be for any of 3 different objects. At the very least, the IOUtils call to get the response content as a string should be on another line. > Create a new ElasticSearch processor that supports the JSON DSL > --------------------------------------------------------------- > > Key: NIFI-4325 > URL: https://issues.apache.org/jira/browse/NIFI-4325 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Mike Thomsen > Priority: Minor > > The existing ElasticSearch processors use the Lucene-style syntax for > querying, not the JSON DSL. A new processor is needed that can take a full > JSON query and execute it. It should also support aggregation queries in this > syntax. A user needs to be able to take a query as-is from Kibana and drop it > into NiFi and have it just run. -- This message was sent by Atlassian JIRA (v7.6.3#76005)