[ 
https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138832#comment-17138832
 ] 

Guozhang Wang commented on KAFKA-10091:
---------------------------------------

We had a quick offline sync about the known scenarios that we want to support 
for users to make trade-offs between latency with timestamp synchronization 
(consistency) with multiple input partitions of a given task:

1) I do not care about which records get processed first, and I just want to 
process any records that are fetched and available for processing.
2) For any partition, as long as it has data available to fetch (which can be 
determined by the consumption lag > 0, for example), I would wait until those 
data are fetched to determine which record to choose next.
3) For any partition, even if we have fetched and processed all records 
(consumption lag == 0, for example), I would still like to wait up to the 
configured time before move on to process other partitions without knowing if 
there are late arrived records from this partition (a.k.a "enforced 
processing").

Whenever we are bootstrapping or processing-in-real-time an application, then 
people may prefer to do 2); whereas the current `max.task.idle.ms` actually 
does not work well for either case 2) or case 3). So we propose to change the 
value semantics of the config as that:

* -1: indicate case 1).
* 0: indicate case 2).
* positive value: indicate case 3), and the wait time is bounded by the 
configured value. Note that the timer would only start ticking when the 
consumer lag drops to zero AND the partition buffer is empty.

cc [~vvcephei][~mjsax][~ableegoldman][~cadonna][~bchen225242]

> Improve task idling
> -------------------
>
>                 Key: KAFKA-10091
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10091
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>            Reporter: John Roesler
>            Priority: Major
>
> When Streams is processing a task with multiple inputs, each time it is ready 
> to process a record, it has to choose which input to process next. It always 
> takes from the input for which the next record has the least timestamp. The 
> result of this is that Streams processes data in timestamp order. However, if 
> the buffer for one of the inputs is empty, Streams doesn't know what 
> timestamp the next record for that input will be.
> Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address 
> this issue.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> The config allows Streams to wait some amount of time for data to arrive on 
> the empty input, so that it can make a timestamp-ordered decision about which 
> input to pull from next.
> However, this config can be hard to use reliably and efficiently, since what 
> we're really waiting for is the next poll that _would_ return data from the 
> empty input's partition, and this guarantee is a function of the poll 
> interval, the max poll interval, and the internal logic that governs when 
> Streams will poll again.
> The ideal case is you'd be able to guarantee at a minimum that _any_ amount 
> of idling would guarantee you poll data from the empty partition if there's 
> data to fetch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to