Repository: nifi
Updated Branches:
  refs/heads/master d4168f5ff -> 0876cf12b


NIFI-3432: Handle Multiple Result Sets in ExecuteSQL

NIFI-3432

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0876cf12
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0876cf12
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0876cf12

Branch: refs/heads/master
Commit: 0876cf12b17ca247703f30c87773d7efac947bdd
Parents: d4168f5
Author: patricker <patric...@gmail.com>
Authored: Wed Oct 4 11:30:51 2017 +0800
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Thu Oct 5 16:05:22 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ExecuteSQL.java    | 100 +++++++++++++------
 .../processors/standard/util/JdbcCommon.java    |  10 ++
 .../processors/standard/TestExecuteSQL.java     |   7 +-
 3 files changed, 86 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0876cf12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index ad79595..2104126 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -196,44 +196,84 @@ public class ExecuteSQL extends AbstractProcessor {
             selectQuery = queryContents.toString();
         }
 
+        int resultCount=0;
         try (final Connection con = dbcpService.getConnection();
             final Statement st = con.createStatement()) {
             st.setQueryTimeout(queryTimeout); // timeout in seconds
-            final AtomicLong nrOfRows = new AtomicLong(0L);
-            if (fileToProcess == null) {
-                fileToProcess = session.create();
-            }
-            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    try {
-                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
-                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
-                        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
-                                .convertNames(convertNamesForAvro)
-                                .useLogicalTypes(useAvroLogicalTypes)
-                                .defaultPrecision(defaultPrecision)
-                                .defaultScale(defaultScale)
-                                .build();
-                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, 
out, options, null));
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
-                    }
+
+            logger.debug("Executing query {}", new Object[]{selectQuery});
+            boolean results = st.execute(selectQuery);
+
+
+            while(results){
+                FlowFile resultSetFF;
+                if(fileToProcess == null){
+                    resultSetFF = session.create();
+                } else {
+                    resultSetFF = session.create(fileToProcess);
+                    resultSetFF = session.putAllAttributes(resultSetFF, 
fileToProcess.getAttributes());
                 }
-            });
 
-            long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+                final AtomicLong nrOfRows = new AtomicLong(0L);
+                resultSetFF = session.write(resultSetFF, new 
OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        try {
+
+                            final ResultSet resultSet = st.getResultSet();
+                            final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
+                                    .convertNames(convertNamesForAvro)
+                                    .useLogicalTypes(useAvroLogicalTypes)
+                                    .defaultPrecision(defaultPrecision)
+                                    .defaultScale(defaultScale)
+                                    .build();
+                            
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
+                        } catch (final SQLException e) {
+                            throw new ProcessException(e);
+                        }
+                    }
+                });
+
+                long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+
+                // set attribute how many rows were selected
+                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_QUERY_DURATION, String.valueOf(duration));
+                resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+
+                logger.info("{} contains {} Avro records; transferring to 
'success'",
+                        new Object[]{resultSetFF, nrOfRows.get()});
+                session.getProvenanceReporter().modifyContent(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows", duration);
+                session.transfer(resultSetFF, REL_SUCCESS);
+
+                resultCount++;
+                // are there anymore result sets?
+                try{
+                    results = st.getMoreResults();
+                } catch(SQLException ex){
+                    results = false;
+                }
+            }
 
-            // set attribute how many rows were selected
-            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_QUERY_DURATION, String.valueOf(duration));
-            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
+            //  pass the original flow file down the line to trigger 
downstream processors
+            if(fileToProcess != null){
+                if(resultCount > 0){
+                    session.remove(fileToProcess);
+                } else {
+                    fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
+                        @Override
+                        public void process(OutputStream out) throws 
IOException {
+                            JdbcCommon.createEmptyAvroStream(out);
+                        }
+                    });
 
-            logger.info("{} contains {} Avro records; transferring to 
'success'",
-                    new Object[]{fileToProcess, nrOfRows.get()});
-            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows", duration);
-            session.transfer(fileToProcess, REL_SUCCESS);
+                    session.transfer(fileToProcess, REL_SUCCESS);
+                }
+            }
         } catch (final ProcessException | SQLException e) {
+            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
+            //  pass the original flow file down the line to trigger 
downstream processors
             if (fileToProcess == null) {
                 // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
                 logger.error("Unable to execute SQL select query {} due to {}. 
No FlowFile to route to failure",

http://git-wip-us.apache.org/repos/asf/nifi/blob/0876cf12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
index bd9a74c..fd8f71e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java
@@ -173,6 +173,16 @@ public class JdbcCommon {
         return convertToAvroStream(rs, outStream, options, callback);
     }
 
+    public static void createEmptyAvroStream(final OutputStream outStream) 
throws IOException {
+        final FieldAssembler<Schema> builder = 
SchemaBuilder.record("NiFi_ExecuteSQL_Record").namespace("any.data").fields();
+        final Schema schema = builder.endRecord();
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, outStream);
+        }
+    }
+
     public static class AvroConversionOptions {
         private final String recordName;
         private final int maxRows;

http://git-wip-us.apache.org/repos/asf/nifi/blob/0876cf12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 5fd1af8..69b7ae5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -222,7 +222,10 @@ public class TestExecuteSQL {
         runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM 
TEST_NO_ROWS");
         runner.run();
 
-        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
+        //No incoming flow file containing a query, and an exception causes no 
outbound flowfile.
+        // There should be no flow files on either relationship
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 0);
+        runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
     }
 
     public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile, final boolean setQueryProperty)
@@ -284,6 +287,8 @@ public class TestExecuteSQL {
         }
     }
 
+
+
     /**
      * Simple implementation only for ExecuteSQL processor testing.
      *

Reply via email to