Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1471#discussion_r142700529
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 ---
    @@ -196,44 +196,80 @@ public void process(InputStream in) throws 
IOException {
                 selectQuery = queryContents.toString();
             }
     
    +        int resultCount=0;
             try (final Connection con = dbcpService.getConnection();
                 final Statement st = con.createStatement()) {
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
    -            final AtomicLong nrOfRows = new AtomicLong(0L);
    -            if (fileToProcess == null) {
    -                fileToProcess = session.create();
    -            }
    -            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    try {
    -                        logger.debug("Executing query {}", new 
Object[]{selectQuery});
    -                        final ResultSet resultSet = 
st.executeQuery(selectQuery);
    -                        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
    -                                .convertNames(convertNamesForAvro)
    -                                .useLogicalTypes(useAvroLogicalTypes)
    -                                .defaultPrecision(defaultPrecision)
    -                                .defaultScale(defaultScale)
    -                                .build();
    -                        
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
    -                    } catch (final SQLException e) {
    -                        throw new ProcessException(e);
    -                    }
    +
    +            logger.debug("Executing query {}", new Object[]{selectQuery});
    +            boolean results = st.execute(selectQuery);
    +
    +
    +            while(results){
    +                FlowFile resultSetFF;
    +                if(fileToProcess == null){
    +                    resultSetFF = session.create();
    +                } else {
    +                    resultSetFF = session.create(fileToProcess);
    +                    resultSetFF = session.putAllAttributes(resultSetFF, 
fileToProcess.getAttributes());
                     }
    -            });
     
    -            long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
    +                final AtomicLong nrOfRows = new AtomicLong(0L);
    +                resultSetFF = session.write(resultSetFF, new 
OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws 
IOException {
    +                        try {
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    -            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_QUERY_DURATION, String.valueOf(duration));
    -            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
    +                            final ResultSet resultSet = st.getResultSet();
    +                            final JdbcCommon.AvroConversionOptions options 
= JdbcCommon.AvroConversionOptions.builder()
    +                                    .convertNames(convertNamesForAvro)
    +                                    .useLogicalTypes(useAvroLogicalTypes)
    +                                    .defaultPrecision(defaultPrecision)
    +                                    .defaultScale(defaultScale)
    +                                    .build();
    +                            
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
    +                        } catch (final SQLException e) {
    +                            throw new ProcessException(e);
    +                        }
    +                    }
    +                });
     
    -            logger.info("{} contains {} Avro records; transferring to 
'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows", duration);
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +                long duration = 
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
    +
    +                // set attribute how many rows were selected
    +                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_QUERY_DURATION, String.valueOf(duration));
    +                resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
    +
    +                logger.info("{} contains {} Avro records; transferring to 
'success'",
    +                        new Object[]{resultSetFF, nrOfRows.get()});
    +                session.getProvenanceReporter().modifyContent(resultSetFF, 
"Retrieved " + nrOfRows.get() + " rows", duration);
    +                session.transfer(resultSetFF, REL_SUCCESS);
    +
    +                resultCount++;
    +                // are there anymore result sets?
    +                results = st.getMoreResults();
    --- End diff --
    
    This should probably be in a try/catch for SQLException, some (usually 
younger or cross-domain) JDBC drivers may not support this method and throw an 
exception here. Also I think at least one driver (can't remember which one) 
closes the result set when the last row has been retrieved, in which case this 
would generate a SQLException as well. But in any case, we can just set results 
to false if an exception occurs and proceed from there. What do you think?


---

Reply via email to