[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665154#comment-16665154 ]
ASF GitHub Bot commented on NIFI-5642: -------------------------------------- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228522688 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + + Map<String, String> attributes = null; + if (context.hasIncomingConnection()) { - fileToProcess = session.get(); + inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. - if (fileToProcess == null && context.hasNonLoopConnection()) { + if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + + attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); - final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); - final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); + final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); + final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); + final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); - if (fileToProcess == null) { - fileToProcess = session.create(); + if(inputFlowFile != null){ + session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); - final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); + final ResultSet resultSet; + + if (queryTimeout > 0) { + resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); + }else{ + resultSet = connectionSession.execute(selectQuery); + } + final AtomicLong nrOfRows = new AtomicLong(0L); - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing CQL query {}", new Object[]{selectQuery}); - final ResultSet resultSet; - if (queryTimeout > 0) { - resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); - } - } else { - resultSet = queryFuture.getUninterruptibly(); - if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); - } else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null)); + do { + fileToProcess = session.create(); + + // Assuring that if we have an input FlowFile + // the generated output inherit the attributes + if(attributes != null){ + fileToProcess = session.putAllAttributes(fileToProcess, attributes); --- End diff -- This is true, but the incremental commit force us to update attributes on the new flowfiles, on my first development I was creating the fileToProcess like your suggestion, after getting some errors on tests, I realized that to inherit the attributes both parent and child flow files must reside on the same session. When we are performing the partial commit this is not true anymore. After the first session.commit, create a new flow file inheriting from the input flow file will cause an error on the next session.commit. > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > ------------------------------------------------------------------------- > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug > Affects Versions: 1.7.1 > Reporter: André Gomes Lamas Otero > Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)