[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16705170#comment-16705170 ]
ASF GitHub Bot commented on NIFI-5642: -------------------------------------- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237965549 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + + Map<String, String> attributes = null; + if (context.hasIncomingConnection()) { - fileToProcess = session.get(); + inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. - if (fileToProcess == null && context.hasNonLoopConnection()) { + if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + + attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); - final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); - final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); + final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); + final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); + final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); - if (fileToProcess == null) { - fileToProcess = session.create(); + if(inputFlowFile != null){ + session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); - final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); + final ResultSet resultSet; + + if (queryTimeout > 0) { + resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); + }else{ + resultSet = connectionSession.execute(selectQuery); + } + final AtomicLong nrOfRows = new AtomicLong(0L); - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing CQL query {}", new Object[]{selectQuery}); - final ResultSet resultSet; - if (queryTimeout > 0) { - resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); - } - } else { - resultSet = queryFuture.getUninterruptibly(); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null)); + do { + fileToProcess = session.create(); + + // Assuring that if we have an input FlowFile + // the generated output inherit the attributes + if(attributes != null){ + fileToProcess = session.putAllAttributes(fileToProcess, attributes); + } + + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing CQL query {}", new Object[]{selectQuery}); + if (queryTimeout > 0) { + if (AVRO_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile, + out, queryTimeout, TimeUnit.MILLISECONDS)); + } else if (JSON_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile, + out, charset, queryTimeout, TimeUnit.MILLISECONDS)); + } + } else { + if (AVRO_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToAvroStream(resultSet, maxRowsPerFlowFile, + out, 0, null)); + } else if (JSON_FORMAT.equals(outputFormat)) { + nrOfRows.set(convertToJsonStream(resultSet, maxRowsPerFlowFile, + out, charset, 0, null)); + } } - } - } catch (final TimeoutException | InterruptedException | ExecutionException e) { - throw new ProcessException(e); + } catch (final TimeoutException | InterruptedException | ExecutionException e) { + throw new ProcessException(e); + } } - } - }); + }); + - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - // set mime.type based on output format - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), - JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary"); + // set mime.type based on output format + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), + JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary"); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(fileToProcess, REL_SUCCESS); + logger.info("{} contains {} records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(fileToProcess, REL_SUCCESS); + session.commit(); --- End diff -- I think that I finally understand your point @mattyb149, thanks for the clarifications! > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > ------------------------------------------------------------------------- > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug > Affects Versions: 1.7.1 > Reporter: André Gomes Lamas Otero > Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)