MikeThomsen commented on code in PR #6848:
URL: https://github.com/apache/nifi/pull/6848#discussion_r1153850188


##########
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java:
##########
@@ -217,60 +243,90 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
         final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
         final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
+        final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final long outputBatchSize = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
         final StopWatch stopWatch = new StopWatch(true);
 
-        if (fileToProcess == null) {
-            fileToProcess = session.create();
-        }
-
         try {
             // The documentation for the driver recommends the session remain 
open the entire time the processor is running
             // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
             final Session connectionSession = cassandraSession.get();
-            final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+            final ResultSet resultSet;
+
+            if (queryTimeout > 0) {
+                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+            }else{
+                resultSet = connectionSession.execute(selectQuery);
+            }
             final AtomicLong nrOfRows = new AtomicLong(0L);
 
-            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
-                        logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-                        final ResultSet resultSet;
-                        if (queryTimeout > 0) {
-                            resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-                            if (AVRO_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToAvroStream(resultSet, 
out, queryTimeout, TimeUnit.MILLISECONDS));
-                            } else if (JSON_FORMAT.equals(outputFormat)) {
-                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
queryTimeout, TimeUnit.MILLISECONDS));
-                            }
-                        } else {
-                            resultSet = queryFuture.getUninterruptibly();
-                            if (AVRO_FORMAT.equals(outputFormat)) {
-                                nrOfRows.set(convertToAvroStream(resultSet, 
out, 0, null));
-                            } else if (JSON_FORMAT.equals(outputFormat)) {
-                                
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 
0, null));
+            long flowFileCount = 0;
+
+            if(fileToProcess == null) {
+                fileToProcess = session.create();
+            }
+
+            while(true) {
+
+                fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        try {
+                            logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
+                            if (queryTimeout > 0) {
+                                if (AVRO_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+                                            out, queryTimeout, 
TimeUnit.MILLISECONDS));
+                                } else if (JSON_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+                                            out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
+                                }
+                            } else {
+                                if (AVRO_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile,
+                                            out, 0, null));
+                                } else if (JSON_FORMAT.equals(outputFormat)) {
+                                    
nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile,
+                                            out, charset, 0, null));
+                                }
                             }
+                        } catch (final TimeoutException | InterruptedException 
| ExecutionException e) {
+                            throw new ProcessException(e);
                         }
-
-                    } catch (final TimeoutException | InterruptedException | 
ExecutionException e) {
-                        throw new ProcessException(e);
                     }
-                }
-            });
+                });
 
-            // set attribute how many rows were selected
-            fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+                // set attribute how many rows were selected
+                fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
 
-            // set mime.type based on output format
-            fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
-                    JSON_FORMAT.equals(outputFormat) ? "application/json" : 
"application/avro-binary");
+                // set mime.type based on output format
+                fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(),
+                        JSON_FORMAT.equals(outputFormat) ? "application/json" 
: "application/avro-binary");
 
-            logger.info("{} contains {} Avro records; transferring to 
'success'",
-                    new Object[]{fileToProcess, nrOfRows.get()});
-            session.getProvenanceReporter().modifyContent(fileToProcess, 
"Retrieved " + nrOfRows.get() + " rows",
-                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(fileToProcess, REL_SUCCESS);
+                if (logger.isDebugEnabled()) {
+                    logger.info("{} contains {} records; transferring to 
'success'",

Review Comment:
   Should be `debug`, but I'll fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to