This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e51ab8c NIFI-7036: This closes #3993. Adding 'Append to Attributes'
to QueryElasticsearchHttp
e51ab8c is described below
commit e51ab8c9d6b678dc4e9549562d7c9f58f0445a04
Author: Joe Gresock <[email protected]>
AuthorDate: Thu Jan 16 20:30:49 2020 +0000
NIFI-7036: This closes #3993. Adding 'Append to Attributes' to
QueryElasticsearchHttp
Signed-off-by: Joe Witt <[email protected]>
---
.../elasticsearch/QueryElasticsearchHttp.java | 32 +++++++++---
.../TestQueryElasticsearchHttpNoHits.java | 60 ++++++++++++++++++++--
2 files changed, 82 insertions(+), 10 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 7cb1fb4..ea350ad 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -91,7 +91,8 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
public enum QueryInfoRouteStrategy {
NEVER,
ALWAYS,
- NOHIT
+ NOHIT,
+ APPEND_AS_ATTRIBUTES
}
private static final String FROM_QUERY_PARAM = "from";
@@ -103,6 +104,8 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
static final AllowableValue ALWAYS = new
AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route
Query Info");
static final AllowableValue NEVER = new
AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query
Info");
static final AllowableValue NO_HITS = new
AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query
Info if the Query returns no hits");
+ static final AllowableValue APPEND_AS_ATTRIBUTES = new
AllowableValue(QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name(), "Append as
Attributes",
+ "Always append Query Info as attributes, using the existing
relationships (does not add the Query Info relationship).");
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
@@ -221,7 +224,7 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
.displayName("Routing Strategy for Query Info")
.description("Specifies when to generate and route Query Info
after a successful query")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues(ALWAYS, NEVER, NO_HITS)
+ .allowableValues(ALWAYS, NEVER, NO_HITS, APPEND_AS_ATTRIBUTES)
.defaultValue(NEVER.getValue())
.required(false)
.build();
@@ -399,9 +402,9 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
if ( (hits.size() == 0 && priorResultCount == 0 &&
queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT)
|| queryInfoRouteStrategy ==
QueryInfoRouteStrategy.ALWAYS) {
FlowFile queryInfo = flowFile == null ? session.create() :
session.create(flowFile);
- session.putAttribute(queryInfo, "es.query.url",
url.toExternalForm());
- session.putAttribute(queryInfo, "es.query.hitcount",
String.valueOf(hits.size()));
- session.putAttribute(queryInfo, MIME_TYPE.key(),
"application/json");
+ queryInfo = session.putAttribute(queryInfo, "es.query.url",
url.toExternalForm());
+ queryInfo = session.putAttribute(queryInfo,
"es.query.hitcount", String.valueOf(hits.size()));
+ queryInfo = session.putAttribute(queryInfo, MIME_TYPE.key(),
"application/json");
session.transfer(queryInfo,REL_QUERY_INFO);
}
@@ -418,6 +421,10 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
documentFlowFile = session.create();
}
+ if (queryInfoRouteStrategy ==
QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES) {
+ documentFlowFile = session.putAttribute(documentFlowFile,
"es.query.hitcount", String.valueOf(hits.size()));
+ }
+
JsonNode source = hit.get("_source");
documentFlowFile = session.putAttribute(documentFlowFile,
"es.id", retrievedId);
documentFlowFile = session.putAttribute(documentFlowFile,
"es.index", retrievedIndex);
@@ -451,9 +458,20 @@ public class QueryElasticsearchHttp extends
AbstractElasticsearchHttpProcessor {
}
page.add(documentFlowFile);
}
- logger.debug("Elasticsearch retrieved " + responseJson.size() + "
documents, routing to success");
- session.transfer(page, REL_SUCCESS);
+ logger.debug("Elasticsearch retrieved " + responseJson.size() + "
documents, routing to success");
+ // If we want to append query info as attributes but there were no
hits,
+ // pass along the original, if present.
+ if (queryInfoRouteStrategy ==
QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES && page.isEmpty()
+ && flowFile != null) {
+ FlowFile documentFlowFile = null;
+ documentFlowFile = targetIsContent ? session.create(flowFile)
: session.clone(flowFile);
+ documentFlowFile = session.putAttribute(documentFlowFile,
"es.query.hitcount", String.valueOf(hits.size()));
+ documentFlowFile = session.putAttribute(documentFlowFile,
"es.query.url", url.toExternalForm());
+ session.transfer(documentFlowFile, REL_SUCCESS);
+ } else {
+ session.transfer(page, REL_SUCCESS);
+ }
} else {
try {
// 5xx -> RETRY, but a server error might last a while, so
yield
diff --git
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
index d2113bd..b648ec8 100644
---
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
+++
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java
@@ -189,9 +189,56 @@ public class TestQueryElasticsearchHttpNoHits {
runAndVerify(3,3,2,true);
}
+ @Test
+ public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes()
throws IOException {
+ runner = TestRunners.newTestRunner(new
QueryElasticsearchHttpTestProcessor(false));
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY,
QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(true);
+ runAndVerify(1,0,0,false, false);
+ }
+
+ @Test
+ public void
testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes_noHits() throws
IOException {
+ runner = TestRunners.newTestRunner(new
QueryElasticsearchHttpTestProcessor(true));
+ runner.setValidateExpressionUsage(true);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://127.0.0.1:9200");
+
+ runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.QUERY,
+ "source:Twitter AND identifier:\"${identifier}\"");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
+ runner.assertValid();
+
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY,
QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
+ runner.assertValid();
+
+ runner.setIncomingConnection(false);
+ runAndVerify(3,3,2,true, false);
+ }
private void runAndVerify(int expectedResults,int
expectedQueryInfoResults,int expectedHits, boolean targetIsContent) {
+ runAndVerify(expectedResults, expectedQueryInfoResults,
expectedHits, targetIsContent, true);
+ }
+
+ private void runAndVerify(int expectedResults,int
expectedQueryInfoResults,int expectedHits, boolean targetIsContent,
+ boolean expectHitCountOnQueryInfo) {
runner.enqueue("blah".getBytes(), new HashMap<String,
String>() {
{
put("identifier", "28039652140");
@@ -201,20 +248,27 @@ public class TestQueryElasticsearchHttpNoHits {
// Running once should page through the no hit doc
runner.run(1, true, true);
-
runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO,
expectedQueryInfoResults);
- if (expectedQueryInfoResults > 0) {
+ if (expectHitCountOnQueryInfo) {
+
runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO,
expectedQueryInfoResults);
+ if (expectedQueryInfoResults > 0) {
final MockFlowFile out =
runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0);
assertNotNull(out);
if (targetIsContent) {
+ if (expectHitCountOnQueryInfo) {
out.assertAttributeEquals("es.query.hitcount",
String.valueOf(expectedHits));
-
Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2"));
+ }
+
Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2"));
}
+ }
}
runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS,
expectedResults);
if (expectedResults > 0) {
final MockFlowFile out =
runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
+ if (!expectHitCountOnQueryInfo) {
+ out.assertAttributeEquals("es.query.hitcount",
String.valueOf(expectedHits));
+ }
if (targetIsContent) {
out.assertAttributeEquals("filename",
"abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
}