[ 
https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16912783#comment-16912783
 ] 

Sean McNealy commented on SAMZA-2308:
-------------------------------------

Attached a test that runs against 13.1.

> AsyncRunLoop and TieredPriorityChooser fail to be used together
> ---------------------------------------------------------------
>
>                 Key: SAMZA-2308
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2308
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.14.0, 0.13.1
>            Reporter: Sean McNealy
>            Priority: Major
>         Attachments: PriorityChooserTest.scala
>
>
> In the single threaded run loop, a message envelop was replaced using the 
> "tryUpdate(ssp)" function each time before calling ".choose()".
> In the AsyncRunLoop that replacement is delayed until the callback completes. 
> This allows choosing more messages to schedule to tasks which keeps threads 
> busy and allows for scheduling lower priority messages when partitions are 
> available. Good things when threads are available. When a message is chosen 
> for an already processing partition it is added to the task's 
> pendingEnvelopeQueue so that the run loop can choose yet more messages. But 
> the TieredPriorityChooser may respond with a lower priority message for a 
> SystemStreamPartition that is lower than another nonempty 
> SystemStreamPartition since there is no way to exclude a partition or 
> priority level from the ".choose()" operation.
> In fact, the Chooser object can be frequently exhausted of all messages. What 
> happens is that for a partition, one messages from every SystemStream that is 
> not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't 
> respect the set priority settings, so we just devolve to a round robin policy.
> To reproduce, run a job with the following setting:
> task.inputs=kafka.high-priority,kafka.low-priority
> job.container.thread.pool.size=2
> systems.kafka.streams.high-priority.samza.priority=1
> Expected behavior:
> Each partition will fully read the high-priority stream before reading 
> messages from the low-priority stream.
> Observed behavior:
> Each partition reads from both streams as in a round robin policy.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to