[ 
https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665154#comment-16665154
 ] 

ASF GitHub Bot commented on NIFI-5642:
--------------------------------------

Github user aglotero commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/3051#discussion_r228522688
  
    --- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
    @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
     
         @Override
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile inputFlowFile = null;
             FlowFile fileToProcess = null;
    +
    +        Map<String, String> attributes = null;
    +
             if (context.hasIncomingConnection()) {
    -            fileToProcess = session.get();
    +            inputFlowFile = session.get();
     
                 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
                 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
                 // we know that we should run only if we have a FlowFile.
    -            if (fileToProcess == null && context.hasNonLoopConnection()) {
    +            if (inputFlowFile == null && context.hasNonLoopConnection()) {
                     return;
                 }
    +
    +            attributes = inputFlowFile.getAttributes();
             }
     
             final ComponentLog logger = getLogger();
    -        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
    -        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
    +        final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
    +        final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
             final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
    -        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
    +        final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
    +        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
             final StopWatch stopWatch = new StopWatch(true);
     
    -        if (fileToProcess == null) {
    -            fileToProcess = session.create();
    +        if(inputFlowFile != null){
    +            session.transfer(inputFlowFile, REL_ORIGINAL);
             }
     
             try {
                 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
                 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
                 final Session connectionSession = cassandraSession.get();
    -            final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
    +            final ResultSet resultSet;
    +
    +            if (queryTimeout > 0) {
    +                resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
    +            }else{
    +                resultSet = connectionSession.execute(selectQuery);
    +            }
    +
                 final AtomicLong nrOfRows = new AtomicLong(0L);
     
    -            fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws 
IOException {
    -                    try {
    -                        logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
    -                        final ResultSet resultSet;
    -                        if (queryTimeout > 0) {
    -                            resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
    -                            }
    -                        } else {
    -                            resultSet = queryFuture.getUninterruptibly();
    -                            if (AVRO_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
    -                            } else if (JSON_FORMAT.equals(outputFormat)) {
    -                                
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
    +            do {
    +                fileToProcess = session.create();
    +
    +                // Assuring that if we have an input FlowFile
    +                // the generated output inherit the attributes
    +                if(attributes != null){
    +                    fileToProcess = 
session.putAllAttributes(fileToProcess, attributes);
    --- End diff --
    
    This is true, but the incremental commit force us to update attributes on 
the new flowfiles, on my first development I was creating the fileToProcess 
like your suggestion, after getting some errors on tests, I realized that to 
inherit the attributes both parent and child flow files must reside on the same 
session.
    When we are performing the partial commit this is not true anymore.
    After the first session.commit, create a new flow file inheriting from the 
input flow file will cause an error on the next session.commit.


> QueryCassandra processor : output FlowFiles as soon fetch_size is reached
> -------------------------------------------------------------------------
>
>                 Key: NIFI-5642
>                 URL: https://issues.apache.org/jira/browse/NIFI-5642
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.7.1
>            Reporter: André Gomes Lamas Otero
>            Priority: Major
>
> When I'm using QueryCassandra alongside with fetch_size parameter I expected 
> that as soon my reader reaches the fetch_size the processor outputs some data 
> to be processed by the next processor, but QueryCassandra reads all the data, 
> then output the flow files.
> I'll start to work on a patch for this situation, I'll appreciate any 
> suggestion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to