[ https://issues.apache.org/jira/browse/KAFKA-10091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127456#comment-17127456 ]
John Roesler commented on KAFKA-10091: -------------------------------------- Sure thing, Guozhang! I’m a little hazy on exactly what I would propose... it’s part of the scope of this ticket to consider the options and make a good proposal. For context on what I was thinking, though: let’s assume we have one empty input I1 and one non-empty input I2 when it is time to run a task. The default assumption is that we don’t really care about collating the two inputs in time order, so we just carry on processing, and we’ll poll later and maybe get another batch of records for I1. The existing config does a good job of handling another situation in which we do care about collating in timestamp order, and we know that I1 is subject to external delays, like maybe the producer only sends batches every 5 minutes. We can configure task idling to wait five+ minutes to account for this. However, this ticket is about another situation, in which we want to process data in order when there is data available on the brokers, but we don’t really want to wait around for the producer to send more data. For example, running a join on a backlog of data. There are some challenges here: 1. We don’t really know how long it will be until the next poll 2. We don’t know exactly how long it will take until poll _returns_ 3. For a given poll response that doesn’t contain records for I1, we don’t know whether it means that there aren’t any records available, or wether the consumer just didn’t decide to fetch that partition on that poll. To restate the challenge: there’s no specific amount of time we can idle to guarantee we would get data if it’s there or continue otherwise. You could analyze the various timeouts to try and get an upper bound, but it winds up being quite high. In fact, this challenge also confronts the use case I said was well supported. E.g. if we know that the producer sends every five minutes, we could build in some slack and set task idling to like 5:30. However, we have no guarantee that 30 seconds is enough slack. In retrospect, it seems to me like we should treat the task idling timer should start after the next empty poll response for that input, to make it easier to reason about for users. The downside would be, like I said, that it might be a while before the next empty response for that input, so we might also consider some mechanism to poll that input as soon as possible. Anyway, that’s what I meant by that statement :) > Improve task idling > ------------------- > > Key: KAFKA-10091 > URL: https://issues.apache.org/jira/browse/KAFKA-10091 > Project: Kafka > Issue Type: Task > Components: streams > Reporter: John Roesler > Priority: Major > > When Streams is processing a task with multiple inputs, each time it is ready > to process a record, it has to choose which input to process next. It always > takes from the input for which the next record has the least timestamp. The > result of this is that Streams processes data in timestamp order. However, if > the buffer for one of the inputs is empty, Streams doesn't know what > timestamp the next record for that input will be. > Streams introduced a configuration "max.task.idle.ms" in KIP-353 to address > this issue. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization] > The config allows Streams to wait some amount of time for data to arrive on > the empty input, so that it can make a timestamp-ordered decision about which > input to pull from next. > However, this config can be hard to use reliably and efficiently, since what > we're really waiting for is the next poll that _would_ return data from the > empty input's partition, and this guarantee is a function of the poll > interval, the max poll interval, and the internal logic that governs when > Streams will poll again. > The ideal case is you'd be able to guarantee at a minimum that _any_ amount > of idling would guarantee you poll data from the empty partition if there's > data to fetch. -- This message was sent by Atlassian Jira (v8.3.4#803005)