Github user patricker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1471#discussion_r101090170
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 ---
    @@ -192,31 +192,57 @@ public void process(InputStream in) throws 
IOException {
             try (final Connection con = dbcpService.getConnection();
                 final Statement st = con.createStatement()) {
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
    -            final AtomicLong nrOfRows = new AtomicLong(0L);
    -            if (fileToProcess == null) {
    -                fileToProcess = session.create();
    -            }
    -            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    try {
    -                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
    -                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
    -                        
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, 
convertNamesForAvro));
    -                    } catch (final SQLException e) {
    -                        throw new ProcessException(e);
    -                    }
    +
    +
    +            logger.debug("Executing query {}", new Object[]{selectQuery});
    +            boolean results = st.execute(selectQuery);
    +            int resultCount = 0;
    +            while(results){
    +                FlowFile resultSetFF;
    +                if(fileToProcess==null)
    +                    resultSetFF = session.create();
    +                else {
    +                    resultSetFF = session.create(fileToProcess);
    +                    resultSetFF = session.putAllAttributes(resultSetFF, 
fileToProcess.getAttributes());
                     }
    -            });
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                final AtomicLong nrOfRows = new AtomicLong(0L);
    +
    +                resultSetFF = session.write(resultSetFF, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws 
IOException {
    +                        try {
    +                            ResultSet resultSet = st.getResultSet();
    +                            
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, 
convertNamesForAvro));
    +                        } catch (final SQLException e) {
    +                            throw new ProcessException(e);
    +                        }
    +                    }
    +                });
    +
    +                // set attribute how many rows were selected
    +                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +
    +                logger.info("{} contains {} Avro records; transferring to 
'success'",
    +                        new Object[]{resultSetFF, nrOfRows.get()});
    +                session.getProvenanceReporter().modifyContent(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows",
    +                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                session.transfer(resultSetFF, REL_SUCCESS);
    +                resultCount++;
    +
    +                // are there anymore result sets?
    +                results = st.getMoreResults();
    +            }
     
    -            logger.info("{} contains {} Avro records; transferring to 
'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
    -                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +            //If we had at least one result then it's OK to drop the 
original file, but if we had no results then
    +            //  pass the original flow file down the line to trigger 
downstream processors
    +            if(fileToProcess != null) {
    +                if (resultCount > 0) {
    +                    session.remove(fileToProcess);
    +                } else {
    +                    session.transfer(fileToProcess, REL_SUCCESS);
    --- End diff --
    
    @mattyb149, Thanks for the review. In the original code `fileToProcess` 
would be overwritten by the results generated by the query and always 
outputted.  When I put in my original PR I was always dropping the original 
FlowFile, as it served no purpose.  Then, I happened to execute a SQL statement 
that returned no ResultSet... In this scenario the original FlowFile was 
dropped and no FlowFiles were outputted. As a result I realized I needed to 
keep the original FlowFile around if there were no result sets.  The above 
logic removes the original FlowFile under normal circumstances, `if 
(resultCount > 0)`.  It only keeps the original file if there are no result 
sets returned by the SQL.  I don't think my explanation matches up with your 
observations; please let me know if this helps or if there is still something 
wrong with the code that I just missed.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to