[
https://issues.apache.org/jira/browse/NIFI-2417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15514167#comment-15514167
]
ASF GitHub Bot commented on NIFI-2417:
--------------------------------------
Github user gresockj commented on a diff in the pull request:
https://github.com/apache/nifi/pull/733#discussion_r80111550
--- Diff:
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
---
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.codehaus.jackson.JsonNode;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@EventDriven
+@SupportsBatching
+@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
+@CapabilityDescription("Scrolls through an Elasticsearch query using the
specified connection properties. "
+ + "This processor is intended to be run on the primary node, and
is designed for scrolling through "
+ + "huge result sets, as in the case of a reindex. The state must
be cleared before another query "
+ + "can be run. Each page of results is returned, wrapped in a
JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }. "
+ + "Note that the full body of each page of documents will be read
into memory before being "
+ + "written to a Flow File for transfer.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "es.index", description = "The
Elasticsearch index containing the document"),
+ @WritesAttribute(attribute = "es.type", description = "The
Elasticsearch document type") })
+@Stateful(description = "After each successful scroll page, the latest
scroll_id is persisted in scrollId as input for the next scroll call. "
+ + "Once the entire query is complete, finishedQuery state will be
set to true, and the processor will not execute unless this is cleared.",
scopes = { Scope.LOCAL })
+public class ScrollElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
+
+ private static final String FINISHED_QUERY_STATE = "finishedQuery";
+ private static final String SCROLL_ID_STATE = "scrollId";
+ private static final String FIELD_INCLUDE_QUERY_PARAM =
"_source_include";
+ private static final String QUERY_QUERY_PARAM = "q";
+ private static final String SORT_QUERY_PARAM = "sort";
+ private static final String SCROLL_QUERY_PARAM = "scroll";
+ private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
+ private static final String SIZE_QUERY_PARAM = "size";
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description(
+ "All FlowFiles that are read from Elasticsearch are
routed to this relationship.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description(
+ "All FlowFiles that cannot be read from Elasticsearch
are routed to this relationship. Note that only incoming "
+ + "flow files will be routed to
failure.").build();
+
+ public static final PropertyDescriptor QUERY = new
PropertyDescriptor.Builder()
+ .name("scroll-es-query").displayName("Query")
+ .description("The Lucene-style query to run against
ElasticSearch").required(true)
+
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SCROLL_DURATION = new
PropertyDescriptor.Builder()
+ .name("scroll-es-scroll")
+ .displayName("Scroll Duration")
+ .description("The scroll duration is how long each search
context is kept in memory.")
+ .defaultValue("1m")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(
+
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
+ .build();
+
+ public static final PropertyDescriptor INDEX = new
PropertyDescriptor.Builder()
+ .name("scroll-es-index").displayName("Index")
+ .description("The name of the index to read
from").required(true)
+
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TYPE = new
PropertyDescriptor.Builder()
+ .name("scroll-es-type")
+ .displayName("Type")
+ .description(
+ "The (optional) type of this document, used by
Elasticsearch for indexing and searching. If the property is empty or set "
+ + "to _all, the first document matching the
identifier across all types will be retrieved.")
+ .required(false).expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor FIELDS = new
PropertyDescriptor.Builder()
+ .name("scroll-es-fields")
+ .displayName("Fields")
+ .description(
+ "A comma-separated list of fields to retrieve from the
document. If the Fields property is left blank, "
+ + "then the entire document's source will be
retrieved.")
+ .required(false).expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor SORT = new
PropertyDescriptor.Builder()
+ .name("scroll-es-sort")
+ .displayName("Sort")
+ .description(
+ "A sort parameter (e.g., timestamp:asc). If the Sort
property is left blank, "
+ + "then the results will be retrieved in
document order.")
+ .required(false).expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new
PropertyDescriptor.Builder()
+ .name("scroll-es-size").displayName("Page
Size").defaultValue("20")
+ .description("Determines how many documents to return per page
during scrolling.")
+ .required(true).expressionLanguageSupported(true)
+
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ return Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ES_URL);
+ descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+ descriptors.add(USERNAME);
+ descriptors.add(PASSWORD);
+ descriptors.add(CONNECT_TIMEOUT);
+ descriptors.add(RESPONSE_TIMEOUT);
+ descriptors.add(QUERY);
+ descriptors.add(SCROLL_DURATION);
+ descriptors.add(PAGE_SIZE);
+ descriptors.add(INDEX);
+ descriptors.add(TYPE);
+ descriptors.add(FIELDS);
+ descriptors.add(SORT);
+
+ return Collections.unmodifiableList(descriptors);
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ super.setup(context);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session)
+ throws ProcessException {
+
+ try {
+ if (isQueryFinished(context.getStateManager())) {
+ getLogger().trace(
+ "Query has been marked finished in the state
manager. "
+ + "To run another query, clear the
state.");
+ return;
+ }
+ } catch (IOException e) {
+ throw new ProcessException("Could not retrieve state", e);
+ }
+
+ OkHttpClient okHttpClient = getClient();
+
+ FlowFile flowFile = session.create();
+
+ final String index =
context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final String query =
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final String docType =
context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
+ .getValue();
+ final int pageSize =
context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
+ .asInteger().intValue();
+ final String fields = context.getProperty(FIELDS).isSet() ?
context.getProperty(FIELDS)
+ .evaluateAttributeExpressions(flowFile).getValue() : null;
+ final String sort = context.getProperty(SORT).isSet() ?
context.getProperty(SORT)
+ .evaluateAttributeExpressions(flowFile).getValue() : null;
+ final String scroll = context.getProperty(SCROLL_DURATION).isSet()
? context
+
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue()
: null;
+
+ // Authentication
+ final String username = context.getProperty(USERNAME).getValue();
+ final String password = context.getProperty(PASSWORD).getValue();
+
+ final ComponentLog logger = getLogger();
+
+ try {
+ String scrollId = loadScrollId(context.getStateManager());
+
+ if (scrollId != null) {
+ // read the url property from the context
+ final String urlstr =
StringUtils.trimToEmpty(context.getProperty(ES_URL)
+ .getValue());
+ final URL scrollurl = buildRequestURL(urlstr, query,
index, docType, fields, sort,
+ scrollId, pageSize, scroll);
+ final long startNanos = System.nanoTime();
+
+ final Response getResponse =
sendRequestToElasticsearch(okHttpClient, scrollurl,
+ username, password, "GET", null);
+ this.getPage(getResponse, scrollurl, context, session,
flowFile, logger, startNanos);
+ } else {
+ logger.debug("Querying {}/{} from Elasticsearch: {}", new
Object[] { index,
+ docType, query });
+
+ // read the url property from the context
+ final String urlstr =
StringUtils.trimToEmpty(context.getProperty(ES_URL)
+ .getValue());
+ final URL queryUrl = buildRequestURL(urlstr, query, index,
docType, fields, sort,
+ scrollId, pageSize, scroll);
+ final long startNanos = System.nanoTime();
+
+ final Response getResponse =
sendRequestToElasticsearch(okHttpClient, queryUrl,
+ username, password, "GET", null);
+ this.getPage(getResponse, queryUrl, context, session,
flowFile, logger, startNanos);
+ }
+
+ } catch (IOException ioe) {
+ logger.error(
+ "Failed to read from Elasticsearch due to {}, this may
indicate an error in configuration "
+ + "(hosts, username/password, etc.).",
+ new Object[] { ioe.getLocalizedMessage() }, ioe);
+ session.remove(flowFile);
+ context.yield();
+
+ } catch (Exception e) {
+ logger.error("Failed to read {} from Elasticsearch due to {}",
new Object[] { flowFile,
+ e.getLocalizedMessage() }, e);
+ session.transfer(flowFile, REL_FAILURE);
+ context.yield();
+ }
+ }
+
+ private void getPage(final Response getResponse, final URL url, final
ProcessContext context,
+ final ProcessSession session, FlowFile flowFile, final
ComponentLog logger, final long startNanos)
+ throws IOException {
+ final int statusCode = getResponse.code();
+
+ if (isSuccess(statusCode)) {
+ ResponseBody body = getResponse.body();
+ final byte[] bodyBytes = body.bytes();
+ JsonNode responseJson = parseJsonResponse(new
ByteArrayInputStream(bodyBytes));
+ String scrollId = responseJson.get("_scroll_id").asText();
+
+ StringBuilder builder = new StringBuilder();
+
+ builder.append("{ \"hits\" : [");
+
+ JsonNode hits = responseJson.get("hits").get("hits");
+ if (hits.size() == 0) {
+ finishQuery(context.getStateManager());
+ session.remove(flowFile);
+ return;
+ }
+
+ for(int i = 0; i < hits.size(); i++) {
+ JsonNode hit = hits.get(i);
+ String retrievedIndex = hit.get("_index").asText();
+ String retrievedType = hit.get("_type").asText();
+
+ JsonNode source = hit.get("_source");
+ flowFile = session.putAttribute(flowFile, "es.index",
retrievedIndex);
--- End diff --
In the Scroll processor, it always returns a full page of JSON documents
per flow file, so there isn't really an easy way to include the id in an
attribute (though I suppose we could so something like es.result.id.1=[id],
es.result.id.2=[id2], etc)
> Implement Query and Scroll processors for ElasticSearch
> -------------------------------------------------------
>
> Key: NIFI-2417
> URL: https://issues.apache.org/jira/browse/NIFI-2417
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.0.0
> Reporter: Joseph Gresock
> Assignee: Joseph Gresock
> Priority: Minor
> Fix For: 1.1.0
>
>
> FetchElasticsearchHttp allows users to select a single document from
> Elasticsearch in NiFi, but there is no way to run a query to retrieve
> multiple documents.
> We should add a QueryElasticsearchHttp processor for running a query and
> returning a flow file per result, for small result sets. This should allow
> both input and non-input execution.
> A separate ScrollElasticsearchHttp processor would also be useful for
> scrolling through a huge result set. This should use the state manager to
> maintain the scroll_id value, and use this as input to the next scroll page.
> As a result, this processor should not allow flow file input, but should
> retrieve one page per run.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)