NIFI-2583

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-2582

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-2582

Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #877


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a0d1aae6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a0d1aae6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a0d1aae6

Branch: refs/heads/master
Commit: a0d1aae603a64adc31bc0177576c2463b7a22d36
Parents: 1470a52
Author: Peter Wicks <pwi...@micron.com>
Authored: Wed Aug 17 06:59:23 2016 -0600
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Wed Aug 17 13:20:44 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 30 +++++++++++++++++---
 .../standard/QueryDatabaseTableTest.java        | 21 ++++++++++++--
 2 files changed, 44 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a0d1aae6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 4f85282..3b68e29 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -53,7 +53,15 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.text.ParseException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -75,7 +83,11 @@ import java.util.concurrent.atomic.AtomicLong;
         @WritesAttribute(attribute = "querydbtable.row.count"),
         @WritesAttribute(attribute="fragment.identifier", description="If 'Max 
Rows Per Flow File' is set then all FlowFiles from the same query result set "
                 + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
-        @WritesAttribute(attribute="fragment.index", description="If 'Max Rows 
Per Flow File' is set then the position of this FlowFile in the list of 
outgoing FlowFiles that were all derived from the same result set FlowFile. 
This can be "
+        @WritesAttribute(attribute="fragment.count", description="If 'Max Rows 
Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet."),
+        @WritesAttribute(attribute="fragment.index", description="If 'Max Rows 
Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
                 + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
                 + "FlowFiles were produced")})
 @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression 
Language", supportsExpressionLanguage = false, description = "Specifies an 
initial "
@@ -152,7 +164,7 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
         ProcessSession session = sessionFactory.createSession();
-        final Set<FlowFile> resultSetFlowFiles = new HashSet<>();
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
 
         final ComponentLog logger = getLogger();
 
@@ -261,6 +273,14 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
 
                     fragmentIndex++;
                 }
+
+                //set count on all FlowFiles
+                if(maxRowsPerFlowFile > 0) {
+                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                        resultSetFlowFiles.set(i,
+                                
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", 
Integer.toString(fragmentIndex)));
+                    }
+                }
             } catch (final SQLException e) {
                 throw e;
             }
@@ -322,7 +342,9 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
         for (final Map.Entry<PropertyDescriptor, String> entry : 
properties.entrySet()) {
             final String key = entry.getKey().getName();
 
-            if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; }
+            if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) {
+                continue;
+            }
 
             
defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), 
entry.getValue());
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a0d1aae6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index 4279ca3..40fba54 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter;
 import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.util.file.FileUtils;
@@ -387,6 +388,7 @@ public class QueryDatabaseTableTest {
         final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
         Statement stmt = con.createStatement();
         InputStream in;
+        MockFlowFile mff;
 
         try {
             stmt.execute("drop table TEST_QUERY_DB_TABLE");
@@ -412,13 +414,22 @@ public class QueryDatabaseTableTest {
 
         //ensure all but the last file have 9 records each
         for(int ff=0;ff<11;ff++) {
-            in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray());
+            mff = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff);
+            in = new ByteArrayInputStream(mff.toByteArray());
             assertEquals(9, getNumberOfRecordsFromStream(in));
+
+            mff.assertAttributeExists("fragment.identifier");
+            assertEquals(Integer.toString(ff), 
mff.getAttribute("fragment.index"));
+            assertEquals("12", mff.getAttribute("fragment.count"));
         }
 
         //last file should have 1 record
-        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray());
+        mff = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11);
+        in = new ByteArrayInputStream(mff.toByteArray());
         assertEquals(1, getNumberOfRecordsFromStream(in));
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
+        assertEquals("12", mff.getAttribute("fragment.count"));
         runner.clearTransferState();
 
         // Run again, this time no flowfiles/rows should be transferred
@@ -434,7 +445,11 @@ public class QueryDatabaseTableTest {
 
         runner.run();
         runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 
1);
-        in = new 
ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
+        mff = 
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0);
+        in = new ByteArrayInputStream(mff.toByteArray());
+        mff.assertAttributeExists("fragment.identifier");
+        assertEquals(Integer.toString(0), mff.getAttribute("fragment.index"));
+        assertEquals("1", mff.getAttribute("fragment.count"));
         assertEquals(5, getNumberOfRecordsFromStream(in));
         runner.clearTransferState();
 

Reply via email to