Repository: nifi
Updated Branches:
  refs/heads/master 7879a9920 -> 102a9a2b7


NIFI-2674 - Always adding flow files to result set flowfiles, catching runtime 
exceptions while converting to avro

Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #953


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/102a9a2b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/102a9a2b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/102a9a2b

Branch: refs/heads/master
Commit: 102a9a2b74968ad111fb54cdaa16a2609ea73e85
Parents: 7879a99
Author: Bryan Rosander <bryanrosan...@gmail.com>
Authored: Thu Aug 25 19:52:55 2016 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Thu Aug 25 21:16:26 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/QueryDatabaseTable.java | 27 ++++++++------
 .../standard/QueryDatabaseTableTest.java        | 38 +++++++++++++++++++-
 2 files changed, 53 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/102a9a2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index d7f4b24..cfe68b5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -241,15 +241,22 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                     final AtomicLong nrOfRows = new AtomicLong(0L);
 
                     FlowFile fileToProcess = session.create();
-                    fileToProcess = session.write(fileToProcess, out -> {
-                        // Max values will be updated in the state property 
map by the callback
-                        final MaxValueResultSetRowCollector maxValCollector = 
new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
-                        try {
-                            
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, 
maxValCollector, maxRowsPerFlowFile));
-                        } catch (SQLException e) {
-                            throw new ProcessException("Error during database 
query or conversion of records to Avro.", e);
-                        }
-                    });
+
+                    try {
+                        fileToProcess = session.write(fileToProcess, out -> {
+                            // Max values will be updated in the state 
property map by the callback
+                            final MaxValueResultSetRowCollector 
maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, 
dbAdapter);
+                            try {
+                                
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, 
maxValCollector, maxRowsPerFlowFile));
+                            } catch (SQLException | RuntimeException e) {
+                                throw new ProcessException("Error during 
database query or conversion of records to Avro.", e);
+                            }
+                        });
+                    } catch (ProcessException e) {
+                        // Add flowfile to results before rethrowing so it 
will be removed from session in outer catch
+                        resultSetFlowFiles.add(fileToProcess);
+                        throw e;
+                    }
 
                     if (nrOfRows.get() > 0) {
                         // set attribute how many rows were selected
@@ -269,7 +276,6 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
                                 new Object[]{fileToProcess, nrOfRows.get()});
 
                         session.getProvenanceReporter().receive(fileToProcess, 
jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-
                         resultSetFlowFiles.add(fileToProcess);
                     } else {
                         // If there were no rows returned, don't send the 
flowfile
@@ -395,7 +401,6 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
             } catch (ParseException | SQLException e) {
                 throw new IOException(e);
             }
-
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/102a9a2b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
index e97e529..979dd38 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
@@ -70,6 +71,7 @@ public class QueryDatabaseTableTest {
     private TestRunner runner;
     private final static String DB_LOCATION = "target/db_qdt";
     private DatabaseAdapter dbAdapter;
+    private HashMap<String, DatabaseAdapter> origDbAdapters;
 
 
     @BeforeClass
@@ -106,8 +108,9 @@ public class QueryDatabaseTableTest {
     public void setup() throws InitializationException, IOException {
         final DBCPService dbcp = new DBCPServiceSimpleImpl();
         final Map<String, String> dbcpProperties = new HashMap<>();
+        origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
         dbAdapter = new GenericDatabaseAdapter();
-
+        QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
         processor = new MockQueryDatabaseTable();
         runner = TestRunners.newTestRunner(processor);
         runner.addControllerService("dbcp", dbcp, dbcpProperties);
@@ -120,6 +123,8 @@ public class QueryDatabaseTableTest {
     @After
     public void teardown() {
         runner = null;
+        QueryDatabaseTable.dbAdapters.clear();
+        QueryDatabaseTable.dbAdapters.putAll(origDbAdapters);
     }
 
     @Test
@@ -366,6 +371,37 @@ public class QueryDatabaseTableTest {
     }
 
     @Test
+    public void testWithRuntimeException() throws SQLException {
+        // load test data to database
+        final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_NULL_INT");
+        } catch (final SQLException sqle) {
+            // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+        }
+
+        stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
+
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, 
NULL, 1)");
+        stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 
1)");
+
+        runner.setIncomingConnection(false);
+        runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
+
+        QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new 
GenericDatabaseAdapter() {
+            @Override
+            public String getName() {
+                throw new DataFileWriter.AppendWriteException(null);
+            }
+        });
+        runner.run();
+
+        
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
+    }
+
+    @Test
     public void testWithSqlException() throws SQLException {
         // load test data to database
         final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();

Reply via email to