MikeThomsen commented on code in PR #6848: URL: https://github.com/apache/nifi/pull/6848#discussion_r1153850188
########## nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -217,60 +243,90 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); + final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final long outputBatchSize = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); final StopWatch stopWatch = new StopWatch(true); - if (fileToProcess == null) { - fileToProcess = session.create(); - } - try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); - final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); + final ResultSet resultSet; + + if (queryTimeout > 0) { + resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); + }else{ + resultSet = connectionSession.execute(selectQuery); + } final AtomicLong nrOfRows = new AtomicLong(0L); - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - logger.debug("Executing CQL query {}", new Object[]{selectQuery}); - final ResultSet resultSet; - if (queryTimeout > 0) { - resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); - } - } else { - resultSet = queryFuture.getUninterruptibly(); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 0, null)); + long flowFileCount = 0; + + if(fileToProcess == null) { + fileToProcess = session.create(); + } + + while(true) { + + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing CQL query {}", new Object[]{selectQuery}); + if (queryTimeout > 0) { + if (AVRO_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile, + out, queryTimeout, TimeUnit.MILLISECONDS)); + } else if (JSON_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile, + out, charset, queryTimeout, TimeUnit.MILLISECONDS)); + } + } else { + if (AVRO_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile, + out, 0, null)); + } else if (JSON_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile, + out, charset, 0, null)); + } } + } catch (final TimeoutException | InterruptedException | ExecutionException e) { + throw new ProcessException(e); } - - } catch (final TimeoutException | InterruptedException | ExecutionException e) { - throw new ProcessException(e); } - } - }); + }); - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - // set mime.type based on output format - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), - JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary"); + // set mime.type based on output format + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), + JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary"); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(fileToProcess, REL_SUCCESS); + if (logger.isDebugEnabled()) { + logger.info("{} contains {} records; transferring to 'success'", Review Comment: Should be `debug`, but I'll fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org