Repository: nifi Updated Branches: refs/heads/master 7879a9920 -> 102a9a2b7
NIFI-2674 - Always adding flow files to result set flowfiles, catching runtime exceptions while converting to avro Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #953 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/102a9a2b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/102a9a2b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/102a9a2b Branch: refs/heads/master Commit: 102a9a2b74968ad111fb54cdaa16a2609ea73e85 Parents: 7879a99 Author: Bryan Rosander <bryanrosan...@gmail.com> Authored: Thu Aug 25 19:52:55 2016 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Thu Aug 25 21:16:26 2016 -0400 ---------------------------------------------------------------------- .../processors/standard/QueryDatabaseTable.java | 27 ++++++++------ .../standard/QueryDatabaseTableTest.java | 38 +++++++++++++++++++- 2 files changed, 53 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/102a9a2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index d7f4b24..cfe68b5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -241,15 +241,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final AtomicLong nrOfRows = new AtomicLong(0L); FlowFile fileToProcess = session.create(); - fileToProcess = session.write(fileToProcess, out -> { - // Max values will be updated in the state property map by the callback - final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); - try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); - } catch (SQLException e) { - throw new ProcessException("Error during database query or conversion of records to Avro.", e); - } - }); + + try { + fileToProcess = session.write(fileToProcess, out -> { + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); + try { + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); + } catch (SQLException | RuntimeException e) { + throw new ProcessException("Error during database query or conversion of records to Avro.", e); + } + }); + } catch (ProcessException e) { + // Add flowfile to results before rethrowing so it will be removed from session in outer catch + resultSetFlowFiles.add(fileToProcess); + throw e; + } if (nrOfRows.get() > 0) { // set attribute how many rows were selected @@ -269,7 +276,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { new Object[]{fileToProcess, nrOfRows.get()}); session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - resultSetFlowFiles.add(fileToProcess); } else { // If there were no rows returned, don't send the flowfile @@ -395,7 +401,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } catch (ParseException | SQLException e) { throw new IOException(e); } - } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/102a9a2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index e97e529..979dd38 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -70,6 +71,7 @@ public class QueryDatabaseTableTest { private TestRunner runner; private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; + private HashMap<String, DatabaseAdapter> origDbAdapters; @BeforeClass @@ -106,8 +108,9 @@ public class QueryDatabaseTableTest { public void setup() throws InitializationException, IOException { final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map<String, String> dbcpProperties = new HashMap<>(); + origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters); dbAdapter = new GenericDatabaseAdapter(); - + QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter); processor = new MockQueryDatabaseTable(); runner = TestRunners.newTestRunner(processor); runner.addControllerService("dbcp", dbcp, dbcpProperties); @@ -120,6 +123,8 @@ public class QueryDatabaseTableTest { @After public void teardown() { runner = null; + QueryDatabaseTable.dbAdapters.clear(); + QueryDatabaseTable.dbAdapters.putAll(origDbAdapters); } @Test @@ -366,6 +371,37 @@ public class QueryDatabaseTableTest { } @Test + public void testWithRuntimeException() throws SQLException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT"); + + QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() { + @Override + public String getName() { + throw new DataFileWriter.AppendWriteException(null); + } + }); + runner.run(); + + assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); + } + + @Test public void testWithSqlException() throws SQLException { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();