[ 
https://issues.apache.org/jira/browse/NIFI-4475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290898#comment-16290898
 ] 

ASF GitHub Bot commented on NIFI-4475:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2337#discussion_r156955954
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
    @@ -1533,13 +1533,11 @@ public FlowFile get() {
                 return Collections.emptyList();
             }
     
    -        final Connection connection = 
connections.get(context.getNextIncomingConnectionIndex() % connections.size());
    -
    -        return get(connection, new ConnectionPoller() {
    +        return get(new ConnectionPoller() {
                 @Override
                 public List<FlowFileRecord> poll(final Connection connection, 
final Set<FlowFileRecord> expiredRecords) {
                     return connection.poll(new FlowFileFilter() {
    -                    int polled = 0;
    +                    volatile int polled = 0;
    --- End diff --
    
    We don't need this to be volatile. The variable is not shared between 
multiple threads and is not operated on atomically (i.e., we use ++polled, 
which is not atomic for volatile variables).


> Processors that use session.get(batchsize) will yield if multiple inbound 
> connections exist where at least one connection is empty.
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-4475
>                 URL: https://issues.apache.org/jira/browse/NIFI-4475
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 1.3.0
>            Reporter: Matthew Clarke
>            Assignee: Joseph Percivall
>              Labels: nifi
>
> There is a difference between how the NiFi framework handles batches of 
> incoming data  (session.get(batchsize)) versus 1 FlowFile (Session.get()) at 
> a time.
> For example PutSyslog does batches and putUDP processes 1 FlowFile at a time.
> With the batch method, a thread is used to poll connection 1 and requests a 
> batch of FlowFiles.  If it gets at least 1 FlowFile, it sends that 
> FlowFile(s) and ends that thread.  On next thread it round-robins to the next 
> connection (Looped failure relationship for example) and requests a batch 
> again.  If that connection is empty, the framework assumes there is no work 
> to do and yields the processor for the configured "yield duration".  So 
> regardless of run schedule, this processor will not run again for the 
> configured yield duration.
> With processors that only work on 1 FlowFile at a time. The thread will 
> round-robin all the inbound connections until it finds a FlowFile.  If it 
> does not find a FlowFile in any connection the framework will yield the 
> processor for the configured yield duration.
> The intent of yield duration is to keep processors with the default runs 
> schedule of 0 sec from using excessive CPU doing nothing; however, in the 
> case of batches it will yield even if FlowFiles exist on another connection.  
> This can have a huge impact on throughput performance of processors that use 
> session.get(batchsize)
> There are two possible work-arounds to this issue:
> 1. You should see improved performance when multiple inbound connections 
> exist (where any connection may be normally empty) by reducing the configured 
> yield duration. The result is better throughput but at the expense of more 
> CPU usage when all connections are truly empty.
> 2. Only have one inbound connection to processor that work on batches. This 
> can be accomplished by using a funnel.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to