Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1471#discussion_r142700529 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -196,44 +196,80 @@ public void process(InputStream in) throws IOException { selectQuery = queryContents.toString(); } + int resultCount=0; try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { st.setQueryTimeout(queryTimeout); // timeout in seconds - final AtomicLong nrOfRows = new AtomicLong(0L); - if (fileToProcess == null) { - fileToProcess = session.create(); - } - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .build(); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (final SQLException e) { - throw new ProcessException(e); - } + + logger.debug("Executing query {}", new Object[]{selectQuery}); + boolean results = st.execute(selectQuery); + + + while(results){ + FlowFile resultSetFF; + if(fileToProcess == null){ + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); } - }); - long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + final AtomicLong nrOfRows = new AtomicLong(0L); + resultSetFF = session.write(resultSetFF, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration)); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + final ResultSet resultSet = st.getResultSet(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .build(); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + }); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration); - session.transfer(fileToProcess, REL_SUCCESS); + long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + + // set attribute how many rows were selected + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration)); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{resultSetFF, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration); + session.transfer(resultSetFF, REL_SUCCESS); + + resultCount++; + // are there anymore result sets? + results = st.getMoreResults(); --- End diff -- This should probably be in a try/catch for SQLException, some (usually younger or cross-domain) JDBC drivers may not support this method and throw an exception here. Also I think at least one driver (can't remember which one) closes the result set when the last row has been retrieved, in which case this would generate a SQLException as well. But in any case, we can just set results to false if an exception occurs and proceed from there. What do you think?
---