NIFI-2583 Signed-off-by: Matt Burgess <mattyb...@apache.org>
NIFI-2582 Signed-off-by: Matt Burgess <mattyb...@apache.org> NIFI-2582 Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #877 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a0d1aae6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a0d1aae6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a0d1aae6 Branch: refs/heads/master Commit: a0d1aae603a64adc31bc0177576c2463b7a22d36 Parents: 1470a52 Author: Peter Wicks <pwi...@micron.com> Authored: Wed Aug 17 06:59:23 2016 -0600 Committer: Matt Burgess <mattyb...@apache.org> Committed: Wed Aug 17 13:20:44 2016 -0400 ---------------------------------------------------------------------- .../processors/standard/QueryDatabaseTable.java | 30 +++++++++++++++++--- .../standard/QueryDatabaseTableTest.java | 21 ++++++++++++-- 2 files changed, 44 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a0d1aae6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 4f85282..3b68e29 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -53,7 +53,15 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -75,7 +83,11 @@ import java.util.concurrent.atomic.AtomicLong; @WritesAttribute(attribute = "querydbtable.row.count"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), - @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + "FlowFiles were produced")}) @DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " @@ -152,7 +164,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); - final Set<FlowFile> resultSetFlowFiles = new HashSet<>(); + final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); final ComponentLog logger = getLogger(); @@ -261,6 +273,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { fragmentIndex++; } + + //set count on all FlowFiles + if(maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } + } } catch (final SQLException e) { throw e; } @@ -322,7 +342,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { final String key = entry.getKey().getName(); - if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { continue; } + if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { + continue; + } defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/a0d1aae6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 4279ca3..40fba54 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -30,6 +30,7 @@ import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; @@ -387,6 +388,7 @@ public class QueryDatabaseTableTest { final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); Statement stmt = con.createStatement(); InputStream in; + MockFlowFile mff; try { stmt.execute("drop table TEST_QUERY_DB_TABLE"); @@ -412,13 +414,22 @@ public class QueryDatabaseTableTest { //ensure all but the last file have 9 records each for(int ff=0;ff<11;ff++) { - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); } //last file should have 1 record - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred @@ -434,7 +445,11 @@ public class QueryDatabaseTableTest { runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); - in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + mff = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(0), mff.getAttribute("fragment.index")); + assertEquals("1", mff.getAttribute("fragment.count")); assertEquals(5, getNumberOfRecordsFromStream(in)); runner.clearTransferState();