Repository: nifi
Updated Branches:
  refs/heads/master 3ad324351 -> ed30bb9b7


NIFI-5130 ExecuteInfluxDBQuery processor chunking support

This closes #2666

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ed30bb9b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ed30bb9b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ed30bb9b

Branch: refs/heads/master
Commit: ed30bb9b7872840b3b5348766c0e68b69617abd8
Parents: 3ad3243
Author: Michal Misiewicz <misiewicz.mich...@gmail.com>
Authored: Sun Apr 29 10:01:34 2018 +0200
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Fri May 4 17:32:10 2018 -0400

----------------------------------------------------------------------
 .../influxdb/ExecuteInfluxDBQuery.java          | 74 ++++++++++++++++++--
 .../processors/influxdb/AbstractITInfluxDB.java |  9 +++
 .../influxdb/ITExecuteInfluxDBQuery.java        | 58 ++++++++++++---
 .../influxdb/TestExecutetInfluxDBQuery.java     | 22 +++---
 4 files changed, 135 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ed30bb9b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
index ddb0972..a029f14 100644
--- 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
+++ 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.nifi.processors.influxdb;
+
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -32,9 +33,11 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.InfluxDB;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 import com.google.gson.Gson;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -48,6 +51,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -64,6 +69,8 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
 
     public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
 
+    private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0;
+
     public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = 
new PropertyDescriptor.Builder()
             .name("influxdb-query-result-time-unit")
             .displayName("Query Result Time Units")
@@ -86,6 +93,20 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
+    public static final Integer MAX_CHUNK_SIZE = 10000;
+
+    public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new 
PropertyDescriptor.Builder()
+            .name("influxdb-query-chunk-size")
+            .displayName("Results chunk size")
+            .description("Chunking can be used to return results in a stream 
of smaller batches "
+                + "(each has a partial results up to a chunk size) rather than 
as a single response. "
+                + "Chunking queries can return an unlimited number of rows. 
Note: Chunking is enable when result chunk size is greater than 0")
+            .defaultValue(String.valueOf(DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.createLongValidator(0, 
MAX_CHUNK_SIZE, true))
+            .required(true)
+            .build();
+
     static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
             .description("Successful InfluxDB queries are routed to this 
relationship").build();
 
@@ -111,6 +132,7 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
         tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
         tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT);
         tempDescriptors.add(INFLUX_DB_QUERY);
+        tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE);
         tempDescriptors.add(USERNAME);
         tempDescriptors.add(PASSWORD);
         tempDescriptors.add(CHARSET);
@@ -189,9 +211,10 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
 
         try {
             long startTimeMillis = System.currentTimeMillis();
-            QueryResult result = executeQuery(context, database, query, 
queryResultTimeunit);
+            int chunkSize = 
context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger();
+            List<QueryResult> result = executeQuery(context, database, query, 
queryResultTimeunit, chunkSize);
 
-            String json = gson.toJson(result);
+            String json = result.size() == 1 ? gson.toJson(result.get(0)) : 
gson.toJson(result);
 
             if ( getLogger().isDebugEnabled() ) {
                 getLogger().debug("Query result {} ", new Object[] {result});
@@ -203,13 +226,13 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
 
             final long endTimeMillis = System.currentTimeMillis();
 
-            if ( ! result.hasError() ) {
+            if ( ! hasErrors(result) ) {
                 outgoingFlowFile = session.putAttribute(outgoingFlowFile, 
INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
                 session.getProvenanceReporter().send(outgoingFlowFile, 
makeProvenanceUrl(context, database),
                         (endTimeMillis - startTimeMillis));
                 session.transfer(outgoingFlowFile, REL_SUCCESS);
             } else {
-                outgoingFlowFile = populateErrorAttributes(session, 
outgoingFlowFile, query, result.getError());
+                outgoingFlowFile = populateErrorAttributes(session, 
outgoingFlowFile, query, queryErrors(result));
                 session.transfer(outgoingFlowFile, REL_FAILURE);
             }
 
@@ -242,8 +265,31 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
             .append(database).toString();
     }
 
-    protected QueryResult executeQuery(final ProcessContext context, String 
database, String query, TimeUnit timeunit) {
-        return getInfluxDB(context).query(new Query(query, database),timeunit);
+    protected List<QueryResult> executeQuery(final ProcessContext context, 
String database, String query, TimeUnit timeunit,
+                                             int chunkSize) throws 
InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        InfluxDB influx = getInfluxDB(context);
+        Query influxQuery = new Query(query, database);
+
+        if (chunkSize > 0) {
+            List<QueryResult> results = new LinkedList<>();
+            influx.query(influxQuery, chunkSize, result -> {
+                if (isQueryDone(result.getError())) {
+                    latch.countDown();
+                } else {
+                    results.add(result);
+                }
+            });
+            latch.await();
+
+            return results;
+        } else {
+            return Collections.singletonList(influx.query(influxQuery, 
timeunit));
+        }
+    }
+
+    private boolean isQueryDone(String error) {
+        return error != null && error.equals("DONE");
     }
 
     protected FlowFile populateErrorAttributes(final ProcessSession session, 
FlowFile flowFile, String query,
@@ -255,6 +301,22 @@ public class ExecuteInfluxDBQuery extends 
AbstractInfluxDBProcessor {
         return flowFile;
     }
 
+    private Boolean hasErrors(List<QueryResult> results) {
+        for (QueryResult result: results) {
+            if (result.hasError()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private String queryErrors(List<QueryResult> results) {
+        return results.stream()
+                .filter(QueryResult::hasError)
+                .map(QueryResult::getError)
+                .collect(Collectors.joining("\n"));
+    }
+
     @OnStopped
     public void close() {
         super.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed30bb9b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
index 48ec149..120a92d 100644
--- 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
+++ 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 package org.apache.nifi.processors.influxdb;
+
+import com.google.gson.reflect.TypeToken;
 import org.apache.nifi.util.TestRunner;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
@@ -22,6 +24,9 @@ import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 import org.junit.After;
 
+import java.lang.reflect.Type;
+import java.util.List;
+
 /**
  * Base integration test class for InfluxDB processors
  */
@@ -34,6 +39,8 @@ public class AbstractITInfluxDB {
     protected String password = "admin";
     protected static final String DEFAULT_RETENTION_POLICY = "autogen";
 
+    protected Type QueryResultListType = new 
TypeToken<List<QueryResult>>(){}.getType();
+
     protected void initInfluxDB() throws InterruptedException, Exception {
         influxDB = InfluxDBFactory.connect(dbUrl,user,password);
         influxDB.createDatabase(dbName);
@@ -52,6 +59,8 @@ public class AbstractITInfluxDB {
             checkError(result);
             result = influxDB.query(new Query("DROP measurement testm", 
dbName));
             checkError(result);
+            result = influxDB.query(new Query("DROP measurement 
chunkedQueryTest", dbName));
+            checkError(result);
             result = influxDB.query(new Query("DROP database " + dbName, 
dbName));
             Thread.sleep(1000);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed30bb9b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
index 0a0844f..a503731 100644
--- 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
+++ 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
@@ -16,28 +16,31 @@
  */
 package org.apache.nifi.processors.influxdb;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 
 import org.junit.Assert;
 
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunners;
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.QueryResult;
 import org.influxdb.dto.QueryResult.Series;
+
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.gson.Gson;
 
+
+
 /**
  * Integration test for executing InfluxDB queries. Please ensure that the 
InfluxDB is running
  * on local host with default port and has database test with table test. 
Please set user
@@ -274,12 +277,12 @@ public class ITExecuteInfluxDBQuery extends 
AbstractITInfluxDB {
     }
 
     @Test
-    public void testQueryResultHasError() {
+    public void testQueryResultHasError() throws Throwable {
         ExecuteInfluxDBQuery mockExecuteInfluxDBQuery = new 
ExecuteInfluxDBQuery() {
             @Override
-            protected QueryResult executeQuery(ProcessContext context, String 
database, String query, TimeUnit timeunit) {
-                QueryResult result = super.executeQuery(context, database, 
query, timeunit);
-                result.setError("Test Error");
+            protected List<QueryResult> executeQuery(ProcessContext context, 
String database, String query, TimeUnit timeunit, int chunkSize) throws 
InterruptedException{
+                List<QueryResult> result = super.executeQuery(context, 
database, query, timeunit, chunkSize);
+                result.get(0).setError("Test Error");
                 return result;
             }
 
@@ -326,4 +329,37 @@ public class ITExecuteInfluxDBQuery extends 
AbstractITInfluxDB {
         runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
         testValidTwoPoints();
     }
+
+    @Test
+    public void testChunkedQuery() {
+        String message =
+                "chunkedQueryTest,country=GB value=1 1524938495000000000" + 
System.lineSeparator() +
+                "chunkedQueryTest,country=PL value=2 1524938505000000000" + 
System.lineSeparator() +
+                "chunkedQueryTest,country=US value=3 1524938505800000000";
+
+        influxDB.write(dbName, DEFAULT_RETENTION_POLICY, 
InfluxDB.ConsistencyLevel.ONE, message);
+
+        String query = "select * from chunkedQueryTest";
+        byte [] bytes = query.getBytes();
+        runner.enqueue(bytes);
+        runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY_CHUNK_SIZE, 
"2");
+        runner.run(1,true,true);
+
+        runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 
1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertNull("Value should be null", 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
+
+        List<QueryResult> queryResult = gson.fromJson(new StringReader(new 
String(flowFiles.get(0).toByteArray())), QueryResultListType);
+
+        assertNotNull("QueryResult array should not be null", queryResult);
+        assertEquals("QueryResult array size should be equal 2", 2, 
queryResult.size());
+
+        assertEquals("First chunk should have 2 elements",2, 
chunkSize(queryResult.get(0)));
+        assertEquals("Second chunk should have 1 elements",1, 
chunkSize(queryResult.get(1)));
+    }
+
+    private int chunkSize(QueryResult queryResult) {
+        return 
queryResult.getResults().get(0).getSeries().get(0).getValues().size();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed30bb9b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java
 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java
index dfed700..d6aad18 100644
--- 
a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java
+++ 
b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestExecutetInfluxDBQuery.java
@@ -16,14 +16,6 @@
  */
 package org.apache.nifi.processors.influxdb;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -37,6 +29,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
 public class TestExecutetInfluxDBQuery {
     private TestRunner runner;
     private ExecuteInfluxDBQuery mockExecuteInfluxDBQuery;
@@ -50,7 +50,7 @@ public class TestExecutetInfluxDBQuery {
             }
 
             @Override
-            protected QueryResult executeQuery(ProcessContext context, String 
database, String query, TimeUnit timeunit) {
+            protected List<QueryResult> executeQuery(ProcessContext context, 
String database, String query, TimeUnit timeunit, int chunkSize) {
                 return null;
             }
 
@@ -83,7 +83,7 @@ public class TestExecutetInfluxDBQuery {
             }
 
             @Override
-            protected QueryResult executeQuery(ProcessContext context, String 
database, String query, TimeUnit timeunit) {
+            protected List<QueryResult> executeQuery(ProcessContext context, 
String database, String query, TimeUnit timeunit, int chunkSize) {
                 throw new RuntimeException("runtime exception");
             }
 
@@ -115,7 +115,7 @@ public class TestExecutetInfluxDBQuery {
             }
 
             @Override
-            protected QueryResult executeQuery(ProcessContext context, String 
database, String query, TimeUnit timeunit) {
+            protected List<QueryResult> executeQuery(ProcessContext context, 
String database, String query, TimeUnit timeunit, int chunkSize) {
                 throw new RuntimeException("runtime exception", new 
SocketTimeoutException("timeout"));
             }
 

Reply via email to