gresockj commented on code in PR #7671:
URL: https://github.com/apache/nifi/pull/7671#discussion_r1350356828


##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java:
##########
@@ -109,19 +210,92 @@ public interface ElasticsearchRestProcessor extends 
VerifiableProcessor {
             .description("All flowfiles that fail due to server/cluster 
availability go to this relationship.")
             .build();
 
+    String DEFAULT_QUERY_JSON = "{}";

Review Comment:
   Seems like this should be a constant.



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java:
##########
@@ -460,7 +462,7 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 stopWatch.getDuration(TimeUnit.MILLISECONDS)
         );
 
-        input = session.putAllAttributes(input, new HashMap<>() {{
+        input = session.putAllAttributes(input, new HashMap<String, String>() 
{{

Review Comment:
   Curious about this -- does it get rid of a warning or something?



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+        @WritesAttribute(attribute = "page.number", description = "The number 
of the page (request) in which the results were returned that are in the output 
flowfile"),

Review Comment:
   Perhaps indicate whether 0-based or 1-based



##########
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = 
"application/json"),
+        @WritesAttribute(attribute = "page.number", description = "The number 
of the page (request) in which the results were returned that are in the output 
flowfile"),
+        @WritesAttribute(attribute = "hit.count", description = "The number of 
hits that are in the output flowfile"),
+        @WritesAttribute(attribute = "elasticsearch.query.error", description 
= "The error message provided by Elasticsearch if there is an error querying 
the index.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@PrimaryNodeOnly
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", 
"elasticsearch8", "query", "scroll", "page", "search", "json"})
+@CapabilityDescription("A processor that repeatedly runs a paginated query 
against a field using a Range query to consume new Documents from an 
Elasticsearch index/query. " +
+        "The processor will retrieve multiple pages of results until either no 
more results are available or the Pagination Keep Alive expiration is reached, 
" +
+        "after which the Range query will automatically update the field 
constraint based on the last retrieved Document value.")
+@SeeAlso({SearchElasticsearch.class, PaginatedJsonQueryElasticsearch.class})
+@DynamicProperty(
+        name = "The name of a URL query parameter to add",
+        value = "The value of the URL query parameter",
+        expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
+        description = "Adds the specified property name/value as a query 
parameter in the Elasticsearch URL used for processing. " +
+                "These parameters will override any matching parameters in the 
query request body. " +
+                "For SCROLL type queries, these parameters are only used in 
the initial (first page) query as the " +
+                "Elasticsearch Scroll API does not support the same query 
parameters for subsequent pages of data.")
+@Stateful(scopes = Scope.CLUSTER, description = "The pagination state 
(scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, 
trackingRangeValue) " +
+        "is retained in between invocations of this processor until the 
Scroll/PiT has expired " +
+        "(when the current time is later than the last query execution plus 
the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"Care should be taken on the size of each page because each response " +
+        "from Elasticsearch will be loaded into memory all at once and 
converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends SearchElasticsearch {
+    static final String STATE_RANGE_VALUE = "trackingRangeValue";
+
+    public static final PropertyDescriptor RANGE_FIELD = new 
PropertyDescriptor.Builder()
+            .name("es-rest-range-field")
+            .displayName("Range Query Field")
+            .description("Field to be tracked as part of an Elasticsearch 
Range query using a \"gt\" bound match. " +
+                    "This field must exist within the Elasticsearch document 
for it to be retrieved.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RANGE_FIELD_SORT_ORDER = new 
PropertyDescriptor.Builder()
+            .name("es-rest-sort-order")
+            .displayName("Sort Order")
+            .description("The order in which to sort the \"" + 
RANGE_FIELD.getDisplayName() + "\". " +
+                    "A \"sort\" clause for the \"" + 
RANGE_FIELD.getDisplayName() +
+                    "\" field will be prepended to any provided \"" + 
SORT.getDisplayName() + "\" clauses. " +
+                    "If a \"sort\" clause already exists for the \"" + 
RANGE_FIELD.getDisplayName() +
+                    "\" field, it will not be updated.")
+            .allowableValues("asc", "desc")
+            .defaultValue("asc")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RANGE_INITIAL_VALUE = new 
PropertyDescriptor.Builder()
+            .name("es-rest-range-initial-value")
+            .displayName("Initial Value")
+            .description("The initial value to use for the query if the 
processor has not run previously. " +
+                    "If the processor has run previously and stored a value in 
its state, this property will be ignored. " +
+                    "If no value is provided, and the processor has not 
previously run, no Range query bounds will be used, " +
+                    "i.e. all documents will be retrieved in the specified \"" 
+ RANGE_FIELD_SORT_ORDER.getDisplayName() + "\".")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_DATE_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("es-rest-range-format")
+            .displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Format")
+            .description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a 
Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to a 
date with this format. " +
+                    "If not specified, Elasticsearch will use the date format 
provided by the \"" + RANGE_FIELD.getDisplayName() + "\"'s mapping. " +
+                    "For valid syntax, see 
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html";)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(RANGE_INITIAL_VALUE)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RANGE_TIME_ZONE = new 
PropertyDescriptor.Builder()
+            .name("es-rest-range-time-zone")
+            .displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Time 
Zone")
+            .description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a 
Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to UTC 
with this time zone. " +
+                    "Valid values are ISO 8601 UTC offsets, such as \"+01:00\" 
or \"-08:00\", and IANA time zone IDs, such as \"Europe/London\".")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(RANGE_INITIAL_VALUE)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor ADDITIONAL_FILTERS = new 
PropertyDescriptor.Builder()
+            .name("es-rest-additional-filters")
+            .displayName("Additional Filters")
+            .description("One or more query filters in JSON syntax, not Lucene 
syntax. " +
+                    "Ex: [{\"match\":{\"somefield\":\"somevalue\"}}, 
{\"match\":{\"anotherfield\":\"anothervalue\"}}]. " +
+                    "These filters wil be used as part of a Bool query's 
filter.")
+            .addValidator(JsonValidator.INSTANCE)
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(RANGE_FIELD);
+        descriptors.add(RANGE_FIELD_SORT_ORDER);
+        descriptors.add(RANGE_INITIAL_VALUE);
+        descriptors.add(RANGE_DATE_FORMAT);
+        descriptors.add(RANGE_TIME_ZONE);
+        descriptors.add(ADDITIONAL_FILTERS);
+        descriptors.addAll(scrollPropertyDescriptors.stream()
+                .filter(pd -> !QUERY.equals(pd) && !QUERY_CLAUSE.equals(pd) && 
!QUERY_DEFINITION_STYLE.equals(pd))
+                .collect(Collectors.toList()));
+
+        propertyDescriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    protected String trackingRangeField;
+    protected String trackingSortOrder;
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    Scope getStateScope() {
+        return Scope.CLUSTER;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        trackingRangeField = context.getProperty(RANGE_FIELD).getValue();
+        trackingSortOrder = 
context.getProperty(RANGE_FIELD_SORT_ORDER).getValue();
+    }
+
+    @Override
+    PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile 
input, final ProcessContext context, final ProcessSession session) throws 
IOException {
+        final PaginatedJsonQueryParameters paginatedQueryJsonParameters = 
super.buildJsonQueryParameters(input, context, session);
+        
paginatedQueryJsonParameters.setTrackingRangeValue(getTrackingRangeValueOrDefault(context));
+        return paginatedQueryJsonParameters;
+    }
+
+    @Override
+    public void addQueryClause(final Map<String, Object> query, final 
Map<String, String> attributes, final ProcessContext context) throws 
IOException {
+        final List<Map<String, Object>> filters = new ArrayList<>(10);
+
+        // only retrieve documents with values greater than the last queried 
value (if present)
+        final String trackingRangeValue = 
getTrackingRangeValueOrDefault(context);

Review Comment:
   When this is called from verification, the state map appears to be null, 
causing a NPE:
   ```
   2023-10-09 10:42:57,879 ERROR [Verify Processor Config Thread-1] 
o.a.n.controller.StandardProcessorNode Failed to perform verification of 
processor's configuration for 
ConsumeElasticsearch[id=14e04756-018b-1000-e816-b1a31e987ad9]
   java.lang.NullPointerException: Cannot invoke 
"org.apache.nifi.components.state.StateMap.get(String)" because "stateMap" is 
null
        at 
org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch.getTrackingRangeValueOrDefault(ConsumeElasticsearch.java:280)
        at 
org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch.addQueryClause(ConsumeElasticsearch.java:202)
        at 
org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor.getQuery(ElasticsearchRestProcessor.java:241)
        at 
org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor.verifyAfterIndex(ElasticsearchRestProcessor.java:376)
        at 
org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor.verify(ElasticsearchRestProcessor.java:359)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to