This is an automated email from the ASF dual-hosted git repository.

chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 658f2547d8 NIFI-11430 - PaginatedJsonQueryElasticsearch processors 
should not output empty FlowFile if hits have been found; 
PaginatedJsonQueryElasticsearch processors should be able to use _source and 
_meta only result formats when grouping by query
658f2547d8 is described below

commit 658f2547d8f3966375a629d28239ce12b9a1e333
Author: Ryan Van Den Bos <rvandenb...@outlook.com>
AuthorDate: Wed Apr 12 10:21:57 2023 +0100

    NIFI-11430 - PaginatedJsonQueryElasticsearch processors should not output 
empty FlowFile if hits have been found; PaginatedJsonQueryElasticsearch 
processors should be able to use _source and _meta only result formats when 
grouping by query
    
    This closes #7163
    
    Signed-off-by: Chris Sampson <chris.sampso...@gmail.com>
---
 .../AbstractJsonQueryElasticsearch.java            |   2 +-
 .../AbstractPaginatedJsonQueryElasticsearch.java   |   9 +-
 .../AbstractJsonQueryElasticsearchTest.groovy      |  39 +++---
 ...tractPaginatedJsonQueryElasticsearchTest.groovy | 150 ++++++++++++++++-----
 .../PaginatedJsonQueryElasticsearchTest.groovy     |  98 ++++++++------
 .../integration/AbstractElasticsearchITBase.java   |   2 +-
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |   4 +-
 7 files changed, 199 insertions(+), 105 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index 1000779143..f12409b074 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -367,7 +367,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q 
extends JsonQueryParamete
     }
 
     @SuppressWarnings("unchecked")
-    private List<Map<String, Object>> formatHits(final List<Map<String, 
Object>> hits) {
+    List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) 
{
         final List<Map<String, Object>> formattedHits;
 
         if (hitFormat == SearchResultsFormat.METADATA_ONLY) {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
index 01e5994c47..f26a5a3c52 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java
@@ -75,6 +75,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch 
extends AbstractJs
             .build();
 
     static final List<PropertyDescriptor> paginatedPropertyDescriptors;
+
     static {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(QUERY_ATTRIBUTE);
@@ -239,7 +240,7 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
     private void combineHits(final List<Map<String, Object>> hits, final 
PaginatedJsonQueryParameters paginatedJsonQueryParameters,
                              final ProcessSession session, final FlowFile 
parent,
-                             final Map<String, String> attributes, final 
List<FlowFile> hitsFlowFiles) {
+                             final Map<String, String> attributes, final 
List<FlowFile> hitsFlowFiles, final boolean newQuery) {
         if (hits != null && !hits.isEmpty()) {
             final FlowFile hitFlowFile;
             final boolean append = !hitsFlowFiles.isEmpty();
@@ -251,7 +252,7 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
 
             
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount()
 + hits.size(),
                     hits, session, hitFlowFile, attributes, append));
-        } else if (isOutputNoHits()) {
+        } else if (isOutputNoHits() && newQuery) {
             final FlowFile hitFlowFile = createChildFlowFile(session, parent);
             hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, 
attributes));
         }
@@ -271,7 +272,9 @@ public abstract class 
AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
         attributes.put("page.number", 
Integer.toString(paginatedJsonQueryParameters.getPageCount()));
 
         if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
-            combineHits(hits, paginatedJsonQueryParameters, session, parent, 
attributes, hitsFlowFiles);
+
+            final List<Map<String, Object>> formattedHits = formatHits(hits);
+            combineHits(formattedHits, paginatedJsonQueryParameters, session, 
parent, attributes, hitsFlowFiles, newQuery);
 
             // output results if it seems we've combined all available results 
(i.e. no hits in this page and therefore no more expected)
             if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
index 670b9599b0..2eb2b91653 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy
@@ -42,9 +42,8 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf
 import static org.junit.jupiter.api.Assertions.assertNotNull
 import static org.junit.jupiter.api.Assertions.assertThrows
 import static org.junit.jupiter.api.Assertions.assertTrue
-
 abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryElasticsearch> {
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+    static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
 
     static final String INDEX_NAME = "messages"
 
@@ -101,8 +100,9 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
                 .stream().map(r -> r.getValue())
                 .collect(Collectors.joining(", "))
         final String expectedAllowedSplitHits = processor instanceof 
AbstractPaginatedJsonQueryElasticsearch
-            ? ResultOutputStrategy.values().collect {r -> 
r.getValue()}.join(", ")
-            : nonPaginatedResultOutputStrategies
+                ? ResultOutputStrategy.values().collect { r -> r.getValue() 
}.join(", ")
+                : 
ResultOutputStrategy.getNonPaginatedResponseOutputStrategies().stream()
+                    .map(r -> r.getValue()).collect(Collectors.joining(", "))
 
         final AssertionError assertionError = 
assertThrows(AssertionError.class, runner.&run)
         assertThat(assertionError.getMessage(), 
equalTo(String.format("Processor has 8 validation failures:\n" +
@@ -130,7 +130,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     void testBasicQuery() throws Exception {
         // test hits (no splitting) - full hit format
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
         
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, 
SearchResultsFormat.FULL.getValue())
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@@ -139,7 +139,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
         assertOutputContent(hits.getContent(), 10, false)
         final List<Map<String, Object>> result = 
OBJECT_MAPPER.readValue(hits.getContent(), List.class)
         result.forEach({ hit ->
-            final Map<String, Object> h = ((Map<String, Object>)hit)
+            final Map<String, Object> h = ((Map<String, Object>) hit)
             assertFalse(h.isEmpty())
             assertTrue(h.containsKey("_source"))
             assertTrue(h.containsKey("_index"))
@@ -211,14 +211,13 @@ abstract class AbstractJsonQueryElasticsearchTest<P 
extends AbstractJsonQueryEla
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(0)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
         runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, 
"false")
         runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 0, 0, 0)
         assertThat(
                 runner.getProvenanceEvents().stream().filter({ pe ->
-                    pe.getEventType() == ProvenanceEventType.RECEIVE &&
-                            pe.getAttribute("uuid") == 
hits.getAttribute("uuid")
+                    pe.getEventType() == ProvenanceEventType.RECEIVE
                 }).count(),
                 is(0L)
         )
@@ -247,8 +246,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     @Test
     void testAggregations() throws Exception {
         String query = prettyPrint(toJson([
-                query: [ match_all: [:] ],
-                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ 
terms: [ field: "msg" ] ] ]
+                query: [match_all: [:]],
+                aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms: 
[field: "msg"]]]
         ]))
 
         // test aggregations (no splitting) - full aggregation format
@@ -289,7 +288,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
         agg.keySet().forEach({ aggName ->
             final List<Map<String, Object>> termAgg = agg.get(aggName) as 
List<Map<String, Object>>
             assertThat(termAgg.size(), is(5))
-            termAgg.forEach({a ->
+            termAgg.forEach({ a ->
                 assertTrue(a.containsKey("key"))
                 assertTrue(a.containsKey("doc_count"))
             })
@@ -321,8 +320,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
 
         // test using Expression Language (index, type, query)
         query = prettyPrint(toJson([
-                query: [ match_all: [:] ],
-                aggs: [ term_agg: [ terms: [ field: "\${fieldValue}" ] ], 
term_agg2: [ terms: [ field: "\${fieldValue}" ] ] ]
+                query: [match_all: [:]],
+                aggs : [term_agg: [terms: [field: "\${fieldValue}"]], 
term_agg2: [terms: [field: "\${fieldValue}"]]]
         ]))
         runner.setVariable("fieldValue", "msg")
         runner.setVariable("es.index", INDEX_NAME)
@@ -347,8 +346,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     @Test
     void testErrorDuringSearch() throws Exception {
         String query = prettyPrint(toJson([
-                query: [ match_all: [:] ],
-                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ 
terms: [ field: "msg" ] ] ]
+                query: [match_all: [:]],
+                aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms: 
[field: "msg"]]]
         ]))
 
         final TestRunner runner = createRunner(true)
@@ -361,8 +360,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     @Test
     void testQueryAttribute() throws Exception {
         String query = prettyPrint(toJson([
-                query: [ match_all: [:] ],
-                aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ 
terms: [ field: "msg" ] ] ]
+                query: [match_all: [:]],
+                aggs : [term_agg: [terms: [field: "msg"]], term_agg2: [terms: 
[field: "msg"]]]
         ]))
         final String queryAttr = "es.query"
 
@@ -384,7 +383,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     @Test
     void testInputHandling() {
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
 
         runner.setIncomingConnection(true)
         runner.run()
@@ -399,7 +398,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends 
AbstractJsonQueryEla
     @Test
     void testRequestParameters() {
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
         runner.setProperty("refresh", "true")
         runner.setProperty("slices", '${slices}')
         runner.setVariable("slices", "auto")
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
index 0712a732ed..f1a64e3ca5 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy
@@ -32,7 +32,9 @@ import static groovy.json.JsonOutput.toJson
 import static org.hamcrest.CoreMatchers.equalTo
 import static org.hamcrest.CoreMatchers.is
 import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.jupiter.api.Assertions.assertFalse
 import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
 
 abstract class AbstractPaginatedJsonQueryElasticsearchTest extends 
AbstractJsonQueryElasticsearchTest<AbstractPaginatedJsonQueryElasticsearch> {
     abstract boolean isInput()
@@ -40,7 +42,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
     @Test
     void testInvalidPaginationProperties() {
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE,
 "not-a-period")
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
"not-enum")
 
@@ -49,7 +51,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
                 "'%s' validated against 'not-enum' is invalid because Given 
value not found in allowed set '%s'\n" +
                 "'%s' validated against 'not-a-period' is invalid because Must 
be of format <duration> <TimeUnit> where <duration> " +
                 "is a non-negative integer and TimeUnit is a supported Time 
Unit, such as: nanos, millis, secs, mins, hrs, days\n",
-                
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), 
PaginationType.values().collect {p -> p.getValue()}.join(", "),
+                
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), 
PaginationType.values().collect { p -> p.getValue() }.join(", "),
                 
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
                 
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
         )))
@@ -59,7 +61,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
     void testSinglePage() {
         // paged query hits (no splitting)
         final TestRunner runner = createRunner(false)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
         MockFlowFile input = runOnce(runner)
         testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
         FlowFile hits = 
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
@@ -117,13 +119,104 @@ abstract class 
AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
         assertSendEvent(runner, input)
     }
 
+   static void assertFormattedResult(final SearchResultsFormat 
searchResultsFormat, final Map<String, Object> hit) {
+        assertFalse(hit.isEmpty())
+        switch(searchResultsFormat) {
+            case SearchResultsFormat.SOURCE_ONLY:
+                assertFalse(hit.containsKey("_source"))
+                assertFalse(hit.containsKey("_index"))
+                break
+            case SearchResultsFormat.METADATA_ONLY:
+                assertFalse(hit.containsKey("_source"))
+                assertTrue(hit.containsKey("_index"))
+                break
+            case SearchResultsFormat.FULL:
+                assertTrue(hit.containsKey("_source"))
+                assertTrue(hit.containsKey("_index"))
+                break
+            default:
+                throw new IllegalArgumentException("Unknown 
SearchResultsFormat value: " + searchResultsFormat.toString())
+        }
+    }
+
+    private void assertResultsFormat(final TestRunner runner, final 
ResultOutputStrategy resultOutputStrategy, final SearchResultsFormat 
searchResultsFormat) {
+        int flowFileCount
+        String hitsCount
+        boolean ndjson = false
+
+        switch (resultOutputStrategy) {
+            case ResultOutputStrategy.PER_QUERY:
+                flowFileCount = 1
+                hitsCount = "10"
+                ndjson = true
+                break
+            case ResultOutputStrategy.PER_HIT:
+                flowFileCount = 10
+                hitsCount = "1"
+                break
+            case ResultOutputStrategy.PER_RESPONSE:
+                flowFileCount = 1
+                hitsCount = "10"
+                break
+            default:
+                throw new IllegalArgumentException("Unknown 
ResultOutputStrategy value: " + resultOutputStrategy.toString())
+        }
+
+        // Test Relationship counts
+        testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
+
+        // Per response outputs an array of values
+        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({
 hit ->
+            hit.assertAttributeEquals("hit.count", hitsCount)
+            assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
+            if (ResultOutputStrategy.PER_RESPONSE == resultOutputStrategy) {
+                OBJECT_MAPPER.readValue(hit.getContent(), 
ArrayList.class).forEach(h -> {
+                    assertFormattedResult(searchResultsFormat, h as 
Map<String, Object>)
+                })
+            } else {
+                final Map<String, Object> h = 
OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
+                assertFormattedResult(searchResultsFormat, h)
+            }
+            assertThat(
+                    runner.getProvenanceEvents().stream().filter({ pe ->
+                        pe.getEventType() == ProvenanceEventType.RECEIVE &&
+                                pe.getAttribute("uuid") == 
hit.getAttribute("uuid")
+                    }).count(),
+                    is(1L)
+            )
+        })
+    }
+
+    @Test
+    void testResultsFormat() throws Exception {
+        for (final ResultOutputStrategy resultOutputStrategy : 
ResultOutputStrategy.values()) {
+            final TestRunner runner = createRunner(false)
+            runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: 
"asc"]]]])))
+            
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, 
resultOutputStrategy.getValue())
+
+            // Test against each results format
+            for (final SearchResultsFormat searchResultsFormat : 
SearchResultsFormat.values()) {
+                
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, 
searchResultsFormat.getValue())
+
+                // Test against each pagination type
+                for (final PaginationType paginationType : 
PaginationType.values()) {
+                    
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
paginationType.getValue())
+
+                    runOnce(runner)
+                    assertResultsFormat(runner, resultOutputStrategy, 
searchResultsFormat)
+                    reset(runner)
+                }
+            }
+        }
+    }
+
     @Test
     void testScrollError() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setThrowErrorInDelete(true)
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
PaginationType.SCROLL.getValue())
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
 
         // still expect "success" output for exception during final clean-up
         runMultiple(runner, 2)
@@ -147,7 +240,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, 
SearchResultsFormat.FULL.getValue())
         
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, 
AggregationResultsFormat.FULL.getValue())
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
PaginationType.POINT_IN_TIME.getValue())
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
 
         // still expect "success" output for exception during final clean-up
         runMultiple(runner, 2)
@@ -169,7 +262,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         final TestElasticsearchClientService service = getService(runner)
         service.setThrowErrorInPit(true)
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
PaginationType.POINT_IN_TIME.getValue())
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]])))
 
         // expect "failure" output for exception during query setup
         runOnce(runner)
@@ -190,7 +283,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest 
extends AbstractJsonQ
         // test PiT without sort
         final TestRunner runner = createRunner(false)
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
PaginationType.POINT_IN_TIME.getValue())
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [ match_all: [:] ]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
 
         // expect "failure" output for exception during query setup
         runOnce(runner)
@@ -265,46 +358,35 @@ abstract class 
AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
                     is(1L)
             )
         } else {
-            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> 
pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
+            assertThat(runner.getProvenanceEvents().stream().filter({ pe -> 
pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L))
         }
     }
 
     @Test
-    void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
+    void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]]])))
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: 
"asc"]]]])))
         
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, 
"true")
         service.setMaxPages(0)
 
-        // test that an empty flow file is produced for a per query setup
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "0")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "1")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
 == 0
-        reset(runner)
+        for (final PaginationType paginationType : PaginationType.values()) {
+            
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
paginationType.getValue())
 
-        // test that an empty flow file is produced for a per hit setup
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_HIT.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
+            for (final ResultOutputStrategy resultOutputStrategy : 
ResultOutputStrategy.values()) {
+                // test that an empty flow file is produced for a per query 
setup
+                
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 resultOutputStrategy.getValue())
+                runOnce(runner)
+                testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "0")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "1")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
 == 0
-        reset(runner)
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "0")
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "1")
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
 == 0
+                reset(runner)
+            }
+        }
 
-        // test that an empty flow file is produced for a per response setup
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_RESPONSE.getValue())
-        runOnce(runner)
-        testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
 
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "0")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "1")
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize()
 == 0
-        reset(runner)
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
index 992b548b94..30c6eea6a3 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.elasticsearch
 
-
 import org.apache.nifi.processors.elasticsearch.api.PaginationType
 import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
 import org.apache.nifi.util.TestRunner
@@ -40,54 +39,65 @@ class PaginatedJsonQueryElasticsearchTest extends 
AbstractPaginatedJsonQueryElas
         return true
     }
 
+    static void validatePagination(final TestRunner runner, final 
ResultOutputStrategy resultOutputStrategy) {
+        switch (resultOutputStrategy) {
+            case ResultOutputStrategy.PER_RESPONSE:
+                testCounts(runner, 1, 2, 0, 0)
+                int page = 1
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                        { hit ->
+                            hit.assertAttributeEquals("hit.count", "10")
+                            hit.assertAttributeEquals("page.number", 
Integer.toString(page++))
+                        }
+                )
+                break
+            case ResultOutputStrategy.PER_QUERY:
+                testCounts(runner, 1, 1, 0, 0)
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "20")
+                // the "last" page.number is used, so 2 here because there 
were 2 pages of hits
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "2")
+                assertThat(
+                        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
+                        is(20)
+                )
+                break
+            case ResultOutputStrategy.PER_HIT:
+                testCounts(runner, 1, 20, 0, 0)
+                int count = 0
+                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
+                        { hit ->
+                            hit.assertAttributeEquals("hit.count", "1")
+                            // 10 hits per page, so first 10 flow files should 
be page.number 1, the rest page.number 2
+                            hit.assertAttributeEquals("page.number", 
Integer.toString(Math.ceil(++count / 10) as int))
+                        }
+                )
+                break
+            default:
+                throw new IllegalArgumentException("Unknown 
ResultOutputStrategy value: " + resultOutputStrategy)
+        }
+    }
+
     void testPagination(final PaginationType paginationType) {
-        // test flowfile per page
         final TestRunner runner = createRunner(false)
         final TestElasticsearchClientService service = getService(runner)
         service.setMaxPages(2)
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, 
paginationType.getValue())
-        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] 
]])))
-
-        runOnce(runner)
-        testCounts(runner, 1, 2, 0, 0)
-        int page = 1
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
-                { hit ->
-                    hit.assertAttributeEquals("hit.count", "10")
-                    hit.assertAttributeEquals("page.number", 
Integer.toString(page++))
-                }
-        )
-        runner.getStateManager().assertStateNotSet()
-        reset(runner)
-
+        runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, 
prettyPrint(toJson([size: 10, sort: [msg: "desc"], query: [match_all: [:]]])))
 
-        // test hits splitting
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_HIT.getValue())
-        runOnce(runner)
-        testCounts(runner, 1, 20, 0, 0)
-        int count = 0
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
-                { hit ->
-                    hit.assertAttributeEquals("hit.count", "1")
-                    // 10 hits per page, so first 10 flowfiles should be 
page.number 1, the rest page.number 2
-                    hit.assertAttributeEquals("page.number", 
Integer.toString(Math.ceil(++count / 10) as int))
-                }
-        )
-        runner.getStateManager().assertStateNotSet()
-        reset(runner)
+        for (final ResultOutputStrategy resultOutputStrategy : 
ResultOutputStrategy.values()) {
+            
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 resultOutputStrategy.getValue())
 
+            runOnce(runner)
+            validatePagination(runner, resultOutputStrategy)
+            runner.getStateManager().assertStateNotSet()
+            reset(runner)
 
-        // test hits combined
-        
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT,
 ResultOutputStrategy.PER_QUERY.getValue())
-        runOnce(runner)
-        testCounts(runner, 1, 1, 0, 0)
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count",
 "20")
-        // the "last" page.number is used, so 2 here because there were 2 
pages of hits
-        
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number",
 "2")
-        assertThat(
-                
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
-                is(20)
-        )
-        runner.getStateManager().assertStateNotSet()
+            // Check that OUTPUT_NO_HITS true doesn't have any adverse effects 
on pagination
+            runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, 
"true")
+            runOnce(runner)
+            validatePagination(runner, resultOutputStrategy)
+            // Unset OUTPUT_NO_HITS
+            runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, 
"false")
+            reset(runner)
+        }
     }
-}
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 5c2b56ba94..3b23e3c729 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.7.1"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
     private static final int PORT = 9200;
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index 5364ec52a1..571a1fbb3d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -101,7 +101,7 @@ language governing permissions and limitations under the 
License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in 
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.7.1</elasticsearch_docker_image>
                 
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>
@@ -132,7 +132,7 @@ language governing permissions and limitations under the 
License. -->
         <profile>
             <id>elasticsearch7</id>
             <properties>
-                <elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
+                
<elasticsearch_docker_image>7.17.10</elasticsearch_docker_image>
             </properties>
         </profile>
     </profiles>


Reply via email to