This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e51ab8c  NIFI-7036: This closes #3993. Adding 'Append to Attributes' 
to QueryElasticsearchHttp
e51ab8c is described below

commit e51ab8c9d6b678dc4e9549562d7c9f58f0445a04
Author: Joe Gresock <[email protected]>
AuthorDate: Thu Jan 16 20:30:49 2020 +0000

    NIFI-7036: This closes #3993. Adding 'Append to Attributes' to 
QueryElasticsearchHttp
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../elasticsearch/QueryElasticsearchHttp.java      | 32 +++++++++---
 .../TestQueryElasticsearchHttpNoHits.java          | 60 ++++++++++++++++++++--
 2 files changed, 82 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 7cb1fb4..ea350ad 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -91,7 +91,8 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
     public enum QueryInfoRouteStrategy {
         NEVER,
         ALWAYS,
-        NOHIT
+        NOHIT,
+        APPEND_AS_ATTRIBUTES
     }
 
     private static final String FROM_QUERY_PARAM = "from";
@@ -103,6 +104,8 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
     static final AllowableValue ALWAYS = new 
AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route 
Query Info");
     static final AllowableValue NEVER = new 
AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query 
Info");
     static final AllowableValue NO_HITS = new 
AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query 
Info if the Query returns no hits");
+    static final AllowableValue APPEND_AS_ATTRIBUTES = new 
AllowableValue(QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name(), "Append as 
Attributes",
+            "Always append Query Info as attributes, using the existing 
relationships (does not add the Query Info relationship).");
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description(
@@ -221,7 +224,7 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
             .displayName("Routing Strategy for Query Info")
             .description("Specifies when to generate and route Query Info 
after a successful query")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(ALWAYS, NEVER, NO_HITS)
+            .allowableValues(ALWAYS, NEVER, NO_HITS, APPEND_AS_ATTRIBUTES)
             .defaultValue(NEVER.getValue())
             .required(false)
             .build();
@@ -399,9 +402,9 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
             if ( (hits.size() == 0 && priorResultCount == 0 && 
queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT)
                     || queryInfoRouteStrategy == 
QueryInfoRouteStrategy.ALWAYS) {
                 FlowFile queryInfo = flowFile == null ? session.create() : 
session.create(flowFile);
-                session.putAttribute(queryInfo, "es.query.url", 
url.toExternalForm());
-                session.putAttribute(queryInfo, "es.query.hitcount", 
String.valueOf(hits.size()));
-                session.putAttribute(queryInfo, MIME_TYPE.key(), 
"application/json");
+                queryInfo = session.putAttribute(queryInfo, "es.query.url", 
url.toExternalForm());
+                queryInfo = session.putAttribute(queryInfo, 
"es.query.hitcount", String.valueOf(hits.size()));
+                queryInfo = session.putAttribute(queryInfo, MIME_TYPE.key(), 
"application/json");
                 session.transfer(queryInfo,REL_QUERY_INFO);
             }
 
@@ -418,6 +421,10 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                     documentFlowFile = session.create();
                 }
 
+                if (queryInfoRouteStrategy == 
QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES) {
+                    documentFlowFile = session.putAttribute(documentFlowFile, 
"es.query.hitcount", String.valueOf(hits.size()));
+                }
+
                 JsonNode source = hit.get("_source");
                 documentFlowFile = session.putAttribute(documentFlowFile, 
"es.id", retrievedId);
                 documentFlowFile = session.putAttribute(documentFlowFile, 
"es.index", retrievedIndex);
@@ -451,9 +458,20 @@ public class QueryElasticsearchHttp extends 
AbstractElasticsearchHttpProcessor {
                 }
                 page.add(documentFlowFile);
             }
-            logger.debug("Elasticsearch retrieved " + responseJson.size() + " 
documents, routing to success");
 
-            session.transfer(page, REL_SUCCESS);
+            logger.debug("Elasticsearch retrieved " + responseJson.size() + " 
documents, routing to success");
+            // If we want to append query info as attributes but there were no 
hits,
+            // pass along the original, if present.
+            if (queryInfoRouteStrategy == 
QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES && page.isEmpty()
+                    && flowFile != null) {
+                FlowFile documentFlowFile = null;
+                documentFlowFile = targetIsContent ? session.create(flowFile) 
: session.clone(flowFile);
+                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.query.hitcount", String.valueOf(hits.size()));
+                documentFlowFile = session.putAttribute(documentFlowFile, 
"es.query.url", url.toExternalForm());
+                session.transfer(documentFlowFile, REL_SUCCESS);
+            } else {
+                session.transfer(page, REL_SUCCESS);
+            }
         } else {
             try {
                 // 5xx -> RETRY, but a server error might last a while, so 
yield
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
index d2113bd..b648ec8 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
@@ -189,9 +189,56 @@ public class TestQueryElasticsearchHttpNoHits {
                 runAndVerify(3,3,2,true);
         }
 
+        @Test
+        public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes() 
throws IOException {
+                runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor(false));
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
 
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, 
QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(true);
+                runAndVerify(1,0,0,false, false);
+        }
+
+        @Test
+        public void 
testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes_noHits() throws 
IOException {
+                runner = TestRunners.newTestRunner(new 
QueryElasticsearchHttpTestProcessor(true));
+                runner.setValidateExpressionUsage(true);
+                runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, 
"http://127.0.0.1:9200";);
+
+                runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+                runner.assertNotValid();
+                runner.setProperty(QueryElasticsearchHttp.QUERY,
+                        "source:Twitter AND identifier:\"${identifier}\"");
+                runner.assertValid();
+                runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+                runner.assertValid();
+                
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, 
QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
+                runner.assertValid();
+
+                runner.setIncomingConnection(false);
+                runAndVerify(3,3,2,true, false);
+        }
 
         private void runAndVerify(int expectedResults,int 
expectedQueryInfoResults,int expectedHits, boolean targetIsContent) {
+            runAndVerify(expectedResults, expectedQueryInfoResults, 
expectedHits, targetIsContent, true);
+        }
+
+        private void runAndVerify(int expectedResults,int 
expectedQueryInfoResults,int expectedHits, boolean targetIsContent,
+                boolean expectHitCountOnQueryInfo) {
                 runner.enqueue("blah".getBytes(), new HashMap<String, 
String>() {
                         {
                                 put("identifier", "28039652140");
@@ -201,20 +248,27 @@ public class TestQueryElasticsearchHttpNoHits {
                 // Running once should page through the no hit doc
                 runner.run(1, true, true);
 
-                
runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, 
expectedQueryInfoResults);
-                if (expectedQueryInfoResults > 0) {
+                if (expectHitCountOnQueryInfo) {
+                    
runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, 
expectedQueryInfoResults);
+                    if (expectedQueryInfoResults > 0) {
                         final MockFlowFile out = 
runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0);
                         assertNotNull(out);
                         if (targetIsContent) {
+                            if (expectHitCountOnQueryInfo) {
                                 out.assertAttributeEquals("es.query.hitcount", 
String.valueOf(expectedHits));
-                                
Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2";));
+                            }
+                            
Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2";));
                         }
+                    }
                 }
 
                 runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, 
expectedResults);
                 if (expectedResults > 0) {
                         final MockFlowFile out = 
runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0);
                         assertNotNull(out);
+                        if (!expectHitCountOnQueryInfo) {
+                            out.assertAttributeEquals("es.query.hitcount", 
String.valueOf(expectedHits));
+                        }
                         if (targetIsContent) {
                                 out.assertAttributeEquals("filename", 
"abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
                         }

Reply via email to