This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 07131a6 NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query 07131a6 is described below commit 07131a66ea39155f4523b974912b6820058cad5b Author: Chris Sampson <chris.sampso...@gmail.com> AuthorDate: Mon Feb 21 21:12:24 2022 +0000 NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query This closes #5786 Signed-off-by: Mike Thomsen <mthom...@apache.org> --- .../AbstractJsonQueryElasticsearch.java | 22 +++++++++- .../AbstractPaginatedJsonQueryElasticsearch.java | 5 ++- .../AbstractJsonQueryElasticsearchTest.groovy | 49 +++++++++++++++++++++- 3 files changed, 70 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java index 8af8770..25c9417 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java @@ -86,6 +86,17 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete .required(true) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); + public static final PropertyDescriptor OUTPUT_NO_HITS = new PropertyDescriptor.Builder() + .name("el-rest-output-no-hits") + .displayName("Output No Hits") + .description("Output a \"" + REL_HITS.getName() + "\" flowfile even if no hits found for query. " + + "If true, an empty \"" + REL_HITS.getName() + "\" flowfile will be output even if \"" + + REL_AGGREGATIONS.getName() + "\" are output.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); private static final Set<Relationship> relationships; private static final List<PropertyDescriptor> propertyDescriptors; @@ -93,6 +104,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete AtomicReference<ElasticSearchClientService> clientService; String splitUpHits; private String splitUpAggregations; + private boolean outputNoHits; final ObjectMapper mapper = new ObjectMapper(); @@ -112,6 +124,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete descriptors.add(CLIENT_SERVICE); descriptors.add(SEARCH_RESULTS_SPLIT); descriptors.add(AGGREGATION_RESULTS_SPLIT); + descriptors.add(OUTPUT_NO_HITS); propertyDescriptors = Collections.unmodifiableList(descriptors); } @@ -143,6 +156,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue(); splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue(); + + outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean(); } @OnStopped @@ -271,7 +286,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete * for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries. */ - List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session, + List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final Q queryJsonParameters, final ProcessSession session, final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException { if (hits != null && !hits.isEmpty()) { @@ -286,6 +301,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete final String json = mapper.writeValueAsString(hits); hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes)); } + } else if (newQuery && outputNoHits) { + final FlowFile hitFlowFile = createChildFlowFile(session, parent); + hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes)); } transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch); @@ -319,7 +337,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch); } - final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input, + final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), newQuery, queryJsonParameters, session, input, attributes, hitsFlowFiles, transitUri, stopWatch); queryJsonParameters.addHitCount(response.getHits().size()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index e5b47ee..2e8eab6 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -109,6 +109,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs descriptors.add(AGGREGATION_RESULTS_SPLIT); descriptors.add(PAGINATION_TYPE); descriptors.add(PAGINATION_KEEP_ALIVE); + descriptors.add(OUTPUT_NO_HITS); paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors); } @@ -282,7 +283,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs * SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries. */ @Override - List<FlowFile> handleHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters, + List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final ProcessSession session, final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException { paginatedJsonQueryParameters.incrementPageCount(); @@ -298,7 +299,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs hitsFlowFiles.clear(); } } else { - super.handleHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch); + super.handleHits(hits, newQuery, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch); } return hitsFlowFiles; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy index 3ae81a2..b33f944 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy @@ -52,6 +52,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla runner.removeProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE) runner.removeProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT) runner.removeProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT) + runner.removeProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS) final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run) if (processor instanceof SearchElasticsearch) { @@ -82,13 +83,14 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, "not-json") runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, "not-enum") runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, "not-enum2") + runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean") final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch ? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ") : [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ") final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run) - assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 7 validation failures:\n" + + assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" + "'%s' validated against 'not-json' is invalid because %s is not a valid JSON representation due to Unrecognized token 'not': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" + " at [Source: (String)\"not-json\"; line: 1, column: 4]\n" + "'%s' validated against '' is invalid because %s cannot be empty\n" + @@ -96,6 +98,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla "'%s' validated against 'not-a-service' is invalid because Property references a Controller Service that does not exist\n" + "'%s' validated against 'not-enum2' is invalid because Given value not found in allowed set '%s'\n" + "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" + + "'%s' validated against 'not-boolean' is invalid because Given value not found in allowed set 'true, false'\n" + "'%s' validated against 'not-a-service' is invalid because Invalid Controller Service: not-a-service is not a valid Controller Service Identifier\n", AbstractJsonQueryElasticsearch.QUERY.getName(), AbstractJsonQueryElasticsearch.QUERY.getName(), AbstractJsonQueryElasticsearch.INDEX.getName(), AbstractJsonQueryElasticsearch.INDEX.getName(), @@ -103,6 +106,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(), AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits, AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "), + AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(), AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName() ))) } @@ -147,6 +151,45 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla } @Test + void testNoHits() throws Exception { + // test no hits (no output) + final TestRunner runner = createRunner(false) + final TestElasticsearchClientService service = getService(runner) + service.setMaxPages(0) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false") + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 0, 0, 0) + assertThat( + runner.getProvenanceEvents().stream().filter({ pe -> + pe.getEventType() == ProvenanceEventType.RECEIVE && + pe.getAttribute("uuid") == hits.getAttribute("uuid") + }).count(), + is(0L) + ) + reset(runner) + + + // test not hits (with output) + runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true") + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach( + { hit -> + hit.assertAttributeEquals("hit.count", "0") + assertOutputContent(hit.getContent(), 0, false) + assertThat( + runner.getProvenanceEvents().stream().filter({ pe -> + pe.getEventType() == ProvenanceEventType.RECEIVE && + pe.getAttribute("uuid") == hit.getAttribute("uuid") + }).count(), + is(1L) + ) + } + ) + } + + @Test void testAggregations() throws Exception { String query = prettyPrint(toJson([ query: [ match_all: [:] ], @@ -302,7 +345,9 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla if (ndjson) { assertThat(content.split("\n").length, is(count)) } else { - if (count == 1) { + if (count == 0) { + assertThat(content, is("")) + } else if (count == 1) { assertThat(content.startsWith("{") && content.endsWith("}"), is(true)) } else { assertThat(content.startsWith("[") && content.endsWith("]"), is(true))