[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702075#comment-16702075 ]
ASF GitHub Bot commented on NIFI-5642: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237141917 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + + Map<String, String> attributes = null; + if (context.hasIncomingConnection()) { - fileToProcess = session.get(); + inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. - if (fileToProcess == null && context.hasNonLoopConnection()) { + if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + + attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); - final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); - final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); + final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); + final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); + final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); - if (fileToProcess == null) { - fileToProcess = session.create(); + if(inputFlowFile != null){ + session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); - final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); + final ResultSet resultSet; + + if (queryTimeout > 0) { + resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); + }else{ + resultSet = connectionSession.execute(selectQuery); + } + final AtomicLong nrOfRows = new AtomicLong(0L); - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing CQL query {}", new Object[]{selectQuery}); - final ResultSet resultSet; - if (queryTimeout > 0) { - resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); - } - } else { - resultSet = queryFuture.getUninterruptibly(); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null)); + do { + fileToProcess = session.create(); + + // Assuring that if we have an input FlowFile + // the generated output inherit the attributes + if(attributes != null){ + fileToProcess = session.putAllAttributes(fileToProcess, attributes); --- End diff -- What if you wait to transfer the `inputFlowFile` to the `original` relationship after all the children have been transferred/committed? > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > ------------------------------------------------------------------------- > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug > Affects Versions: 1.7.1 > Reporter: André Gomes Lamas Otero > Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)