[ https://issues.apache.org/jira/browse/SAMZA-2308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16912783#comment-16912783 ]
Sean McNealy commented on SAMZA-2308: ------------------------------------- Attached a test that runs against 13.1. > AsyncRunLoop and TieredPriorityChooser fail to be used together > --------------------------------------------------------------- > > Key: SAMZA-2308 > URL: https://issues.apache.org/jira/browse/SAMZA-2308 > Project: Samza > Issue Type: Bug > Components: container > Affects Versions: 0.14.0, 0.13.1 > Reporter: Sean McNealy > Priority: Major > Attachments: PriorityChooserTest.scala > > > In the single threaded run loop, a message envelop was replaced using the > "tryUpdate(ssp)" function each time before calling ".choose()". > In the AsyncRunLoop that replacement is delayed until the callback completes. > This allows choosing more messages to schedule to tasks which keeps threads > busy and allows for scheduling lower priority messages when partitions are > available. Good things when threads are available. When a message is chosen > for an already processing partition it is added to the task's > pendingEnvelopeQueue so that the run loop can choose yet more messages. But > the TieredPriorityChooser may respond with a lower priority message for a > SystemStreamPartition that is lower than another nonempty > SystemStreamPartition since there is no way to exclude a partition or > priority level from the ".choose()" operation. > In fact, the Chooser object can be frequently exhausted of all messages. What > happens is that for a partition, one messages from every SystemStream that is > not empty will be in the FIFO/ArrayDeque pendingEnvelopeQueue which doesn't > respect the set priority settings, so we just devolve to a round robin policy. > To reproduce, run a job with the following setting: > task.inputs=kafka.high-priority,kafka.low-priority > job.container.thread.pool.size=2 > systems.kafka.streams.high-priority.samza.priority=1 > Expected behavior: > Each partition will fully read the high-priority stream before reading > messages from the low-priority stream. > Observed behavior: > Each partition reads from both streams as in a round robin policy. -- This message was sent by Atlassian Jira (v8.3.2#803003)