[ 
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dengpan Yin updated SAMZA-2308:
-------------------------------
    Affects Version/s: 1.2
                       1.1
                       1.0

> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
>                 Key: SAMZA-2308
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2308
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 0.13.1, 1.0, 1.1, 1.2
>            Reporter: Sean McNealy
>            Assignee: Dengpan Yin
>            Priority: Major
>         Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the 
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the worker is ready and 
> the envelope begins processing. This allows choosing more messages to 
> schedule to tasks which keeps threads busy and allows for scheduling lower 
> priority messages when partitions are available. Good things when threads are 
> available. When a message is chosen for an already processing partition it is 
> added to the task's pendingEnvelopeQueue so that the run loop can choose yet 
> more messages. But the TieredPriorityChooser may respond with a lower 
> priority message for a SystemStreamPartition that is lower than another 
> nonempty SystemStreamPartition since there is no way to exclude a partition 
> or priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What 
> happens is that for a partition, one messages from every SystemStream that is 
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't 
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading 
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to