I'm wondering if the kafka architects have plans to redesign/enhance this behavior. Having to guess the idle time isn't the most satisfying solution. No matter what time I put in there it seems possible that I will miss a join.
Respectfully, Chad On Fri, Nov 5, 2021 at 3:07 PM Matthias J. Sax <mj...@apache.org> wrote: > The log clearly indicates that you hit enforced processing. We record > the metric and log: > > Cf > > https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L194-L200 > > Not sure why the metric does not report it... > > Hence, the solution would be to increase `max.task.idle.ms` further to > give Kafka Streams time to fetch the data. > > If might help to use DEBUG log to see for which partitions the consumer > sends fetch requests and which partitions return data, to better > understand the underlying behavior. > > > -Matthias > > On 11/5/21 6:58 AM, Chad Preisler wrote: > > It seems like I have 2 options to work around this issue. > > > > > > - Keep the KTable and have another process running that puts the > missed > > join message back on the event topic. > > - Switch to GlobalKTable. > > > > Any other solutions/workarounds are welcome. > > > > Thanks, > > Chad > > > > On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler <chad.preis...@gmail.com> > > wrote: > > > >> enforced-processing-total is zero for all missed join occurrences. I > >> logged all the metrics out at the time my stream processed the missed > join, > >> so let me know if there are any other metics that would help. > >> > >> On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler <chad.preis...@gmail.com> > >> wrote: > >> > >>> I'm not sure. When I ran with trace logging turned on I saw a bunch of > >>> messages like the ones below. Do those messages indicate > >>> "enforced-processing"? It gets logged right after the call > >>> to enforcedProcessingSensor.record. > >>> > >>> Continuing to process although some partitions are empty on the broker. > >>> There may be out-of-order processing for this task as a result. > Partitions > >>> with local data: [status-5]. Partitions we gave up waiting for, with > their > >>> corresponding deadlines: {event-5=1635881287722}. Configured > >>> max.task.idle.ms: 2000. Current wall-clock time: 1635881287750. > >>> > >>> Continuing to process although some partitions are empty on the broker. > >>> There may be out-of-order processing for this task as a result. > Partitions > >>> with local data: [event-5]. Partitions we gave up waiting for, with > their > >>> corresponding deadlines: {status-5=1635881272754}. Configured > >>> max.task.idle.ms: 2000. Current wall-clock time: 1635881277998. > >>> > >>> On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> > wrote: > >>> > >>>> Can you check if the program ever does "enforced processing", ie, > >>>> `max.task.idle.ms` passed, and we process despite an empty input > buffer. > >>>> > >>>> Cf > https://kafka.apache.org/documentation/#kafka_streams_task_monitoring > >>>> > >>>> As long as there is input data, we should never do "enforced > processing" > >>>> and the metric should stay at zero. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 11/3/21 2:41 PM, Chad Preisler wrote: > >>>>> Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) > >>>> had no > >>>>> effect on this issue. > >>>>> > >>>>> On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler < > chad.preis...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> No unfortunately it is not the case. The table record is written > >>>> about 20 > >>>>>> seconds before the stream record. I’ll crank up the time tomorrow > and > >>>> see > >>>>>> what happens. > >>>>>> > >>>>>> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>>> > >>>>>>> Hard to tell, but as it seems that you can reproduce the issue, it > >>>> might > >>>>>>> be worth a try to increase the idle time further. > >>>>>>> > >>>>>>> I guess one corner case for stream-table join that is not resolved > >>>> yet > >>>>>>> is when stream and table record have the same timestamp... For this > >>>>>>> case, the table record might not be processed first. > >>>>>>> > >>>>>>> Could you hit this case? > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 11/2/21 3:13 PM, Chad Preisler wrote: > >>>>>>>> Thank you for the information. We are using the Kafka 3.0 client > >>>>>>> library. > >>>>>>>> We are able to reliably reproduce this issue in our test > environment > >>>>>>> now. I > >>>>>>>> removed my timestamp extractor, and I set the max.task.idle.ms to > >>>>>>> 2000. I > >>>>>>>> also turned on trace logging for package > >>>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>>>> > >>>>>>>> To create the issue we stopped the application and ran enough data > >>>> to > >>>>>>>> create a lag of 400 messages. We saw 5 missed joins. > >>>>>>>> > >>>>>>>> From the stream-thread log messages we saw the event message, > our > >>>>>>> stream > >>>>>>>> missed the join, and then several milliseconds later we saw the > >>>>>>>> stream-thread print out the status message. The stream-thread > >>>> printed > >>>>>>> out > >>>>>>>> our status message a total of 5 times. > >>>>>>>> > >>>>>>>> Given that only a few milliseconds passed between missing the join > >>>> and > >>>>>>> the > >>>>>>>> stream-thread printing the status message, would increasing the > >>>>>>>> max.task.idle.ms help? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Chad > >>>>>>>> > >>>>>>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org > > > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Timestamp synchronization is not perfect, and as a matter of > fact, > >>>> we > >>>>>>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we > >>>> closed the > >>>>>>>>> last gaps in 3.0.0... *fingers-crossed* :) > >>>>>>>>> > >>>>>>>>>> We are using a timestamp extractor that returns 0. > >>>>>>>>> > >>>>>>>>> You can do this, and it effectively "disables" timestamp > >>>>>>> synchronization > >>>>>>>>> as records on the KTable side don't have a timeline any longer. > As > >>>> a > >>>>>>>>> side effect it also allows you to "bootstrap" the table, as > records > >>>>>>> with > >>>>>>>>> timestamp zero will always be processed first (as they are > >>>> smaller). Of > >>>>>>>>> course, you also don't have time synchronization for "future" > data > >>>> and > >>>>>>>>> your program becomes non-deterministic if you reprocess old data. > >>>>>>>>> > >>>>>>>>>> his seemed to be the only > >>>>>>>>>> way to bootstrap enough records at startup to avoid the missed > >>>> join. > >>>>>>>>> > >>>>>>>>> Using 3.0.0 and enabling timestamp synchronization via > >>>>>>>>> `max.task.idle.ms` config, should allow you to get the correct > >>>>>>> behavior > >>>>>>>>> without the zero-extractor (of course, your KTable data must have > >>>>>>>>> smaller timestamps that your KStream data). > >>>>>>>>> > >>>>>>>>>> If I use "timestamp synchronization" do I have to remove the > zero > >>>>>>>>>> timestamp extractor? If I remove the zero timestamp extractor > will > >>>>>>>>>> timestamp synchronization take care of the missed join issue on > >>>>>>> startup? > >>>>>>>>> > >>>>>>>>> To be more precise: timestamp synchronization is _always_ on. The > >>>>>>>>> question is just how strict it is applied. By default, we do the > >>>>>>> weakest > >>>>>>>>> from which is only best effort. > >>>>>>>>> > >>>>>>>>>> I'm guessing the issue here is that occasionally the poll > request > >>>> is > >>>>>>> not > >>>>>>>>>> returning the matching record for the KTable side of the join > >>>> before > >>>>>>> the > >>>>>>>>>> task goes off and starts processing records. > >>>>>>>>> > >>>>>>>>> Yes, because of default best effort approach. That is why you > >>>> should > >>>>>>>>> increase `max.task.idle.ms` to detect this case and "skip" > >>>> processing > >>>>>>>>> and let KS do another poll() to get KTable data. > >>>>>>>>> > >>>>>>>>> 2.8 and earlier: > >>>>>>>>> > >>>>>>>>> max.task.idle.ms=0 -> best effort (no poll() retry) > >>>>>>>>> max.task.idle.ms>0 -> try to do another poll() until data is > >>>> there or > >>>>>>>>> idle time passed > >>>>>>>>> > >>>>>>>>> Note: >0 might still "fail" even if there is data, because > consumer > >>>>>>>>> fetch behavior is not predictable. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 3.0: > >>>>>>>>> > >>>>>>>>> max.task.idle.ms=-1 -> best effort (no poll() retry) > >>>>>>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to > >>>> poll() > >>>>>>>>> until you get the data > >>>>>>>>> max.task.idle.ms>0 -> even if there is not data broker side, > wait > >>>>>>> until > >>>>>>>>> data becomes available or the idle time passed > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Hope this helps. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote: > >>>>>>>>>> Hello Chad, > >>>>>>>>>> > >>>>>>>>>> From your earlier comment, you mentioned "In my scenario the > >>>> records > >>>>>>>>> were > >>>>>>>>>> written to the KTable topic before the record was written to the > >>>>>>> KStream > >>>>>>>>>> topic." So I think Matthias and others have excluded this > >>>> possibility > >>>>>>>>> while > >>>>>>>>>> trying to help investigate. > >>>>>>>>>> > >>>>>>>>>> If only the matching records from KStream are returned via a > >>>> single a > >>>>>>>>>> consumer poll call but not the other records from KTable, then > you > >>>>>>> would > >>>>>>>>>> miss this matched join result. > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler < > >>>>>>> chad.preis...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Thank you for your response and the links to the presentations. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> *However, this seems tobe orthogonal to your issue?* > >>>>>>>>>>> > >>>>>>>>>>> Yes. From what I see in the code it looks like you have a > single > >>>>>>>>> consumer > >>>>>>>>>>> subscribed to multiple topics. Please correct me if I'm wrong. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> *By default, timestamp synchronization is disabled. > >>>> Maybeenabling it > >>>>>>>>> would > >>>>>>>>>>> help?* > >>>>>>>>>>> > >>>>>>>>>>> We are using a timestamp extractor that returns 0. We did that > >>>>>>> because > >>>>>>>>> we > >>>>>>>>>>> were almost always missing joins on startup, and this seemed to > >>>> be > >>>>>>> the > >>>>>>>>> only > >>>>>>>>>>> way to bootstrap enough records at startup to avoid the missed > >>>> join. > >>>>>>> We > >>>>>>>>>>> found a post that said doing that would make the KTable act > like > >>>> the > >>>>>>>>>>> GlobalKTable at startup. So far this works great, we never > miss a > >>>>>>> join > >>>>>>>>> on a > >>>>>>>>>>> startup. If I use "timestamp synchronization" do I have to > >>>> remove the > >>>>>>>>> zero > >>>>>>>>>>> timestamp extractor? If I remove the zero timestamp extractor > >>>> will > >>>>>>>>>>> timestamp synchronization take care of the missed join issue on > >>>>>>> startup? > >>>>>>>>>>> > >>>>>>>>>>> I'm guessing the issue here is that occasionally the poll > >>>> request is > >>>>>>> not > >>>>>>>>>>> returning the matching record for the KTable side of the join > >>>> before > >>>>>>> the > >>>>>>>>>>> task goes off and starts processing records. Later when we put > >>>> the > >>>>>>> same > >>>>>>>>>>> record on the topic and the KTable has had a chance to load > more > >>>>>>> records > >>>>>>>>>>> the join works and everything is good to go. Because of the way > >>>> our > >>>>>>>>> system > >>>>>>>>>>> works no new status records have been written and so the new > >>>> record > >>>>>>>>> joins > >>>>>>>>>>> against the correct status. > >>>>>>>>>>> > >>>>>>>>>>> Do you agree that the poll request is returning the KStream > >>>> record > >>>>>>> but > >>>>>>>>> not > >>>>>>>>>>> returning the KTable record and therefore the join is getting > >>>>>>> missed? If > >>>>>>>>>>> you don't agree, what do you think is going on? Is there a way > to > >>>>>>> prove > >>>>>>>>>>> this out? > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Chad > >>>>>>>>>>> > >>>>>>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax < > >>>> mj...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Yes, a StreamThread has one consumer. The number of > >>>> StreamThreads > >>>>>>> per > >>>>>>>>>>>> instance is configurable via `num.stream.threads`. Partitions > >>>> are > >>>>>>>>>>>> assigned to threads similar to consumer is a plain consumer > >>>> group. > >>>>>>>>>>>> > >>>>>>>>>>>> It seems you run with the default of one thread per instance. > >>>> As you > >>>>>>>>>>>> spin up 12 instances, it results in 12 threads for the > >>>> application. > >>>>>>> As > >>>>>>>>>>>> you have 12 partitions, using more threads won't be useful as > no > >>>>>>>>>>>> partitions are left for them to process. > >>>>>>>>>>>> > >>>>>>>>>>>> For a stream-table joins, there will be one task per > "partition > >>>>>>> pair" > >>>>>>>>>>>> that computes the join for those partitions. So you get 12 > >>>> tasks, > >>>>>>> and > >>>>>>>>>>>> each thread processes one task in your setup. Ie, a thread > >>>> consumer > >>>>>>> is > >>>>>>>>>>>> reading data for both input topics. > >>>>>>>>>>>> > >>>>>>>>>>>> Pausing happens on a per-partition bases: for joins there is > two > >>>>>>>>> buffers > >>>>>>>>>>>> per task (one for each input topic partition). It's possible > >>>> that > >>>>>>> one > >>>>>>>>>>>> partition is paused while the other is processed. However, > this > >>>>>>> seems > >>>>>>>>> to > >>>>>>>>>>>> be orthogonal to your issue? > >>>>>>>>>>>> > >>>>>>>>>>>> For a GlobalKTable, you get an additional GlobalThread that > only > >>>>>>> reads > >>>>>>>>>>>> the data from the "global topic" to update the GlobalKTable. > >>>>>>> Semantics > >>>>>>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are > different: > >>>> Cf > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ > >>>>>>>>>>>> > >>>>>>>>>>>> For the timestamp synchronization, you may checkout ` > >>>>>>> max.task.idle.ms` > >>>>>>>>>>>> config. By default, timestamp synchronization is disabled. > Maybe > >>>>>>>>>>>> enabling it would help? > >>>>>>>>>>>> > >>>>>>>>>>>> You may also check out slides 34-38: > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ > >>>>>>>>>>>> > >>>>>>>>>>>> There is one corner case: if two records with the same > timestamp > >>>>>>> come > >>>>>>>>>>>> it, it's not defined which one will be processed first. > >>>>>>>>>>>> > >>>>>>>>>>>> Hope this helps. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote: > >>>>>>>>>>>>> Yes, this helped. I have some additional questions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just > >>>> want > >>>>>>> to > >>>>>>>>>>>>> confirm) > >>>>>>>>>>>>> Is there a separate StreamThread for each topic including the > >>>>>>> KTable? > >>>>>>>>>>>>> If a KTable is a StreamThread and there is a StreamTask for > >>>> that > >>>>>>>>>>> KTable, > >>>>>>>>>>>>> could my buffer be getting filled up, and the mainConsumer > for > >>>> the > >>>>>>>>>>> KTable > >>>>>>>>>>>>> be getting paused? I see this code in StreamTask#addRecords. > >>>>>>>>>>>>> > >>>>>>>>>>>>> // if after adding these records, its partition queue's > >>>> buffered > >>>>>>> size > >>>>>>>>>>> has > >>>>>>>>>>>>> been > >>>>>>>>>>>>> // increased beyond the threshold, we can then > >>>> pause the > >>>>>>>>>>>>> consumption for this partition > >>>>>>>>>>>>> if (newQueueSize > maxBufferedSize) { > >>>>>>>>>>>>> mainConsumer.pause(singleton(partition)); > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> Is there any specific logging that I can set to debug or > trace > >>>> that > >>>>>>>>>>> would > >>>>>>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or > >>>> trace on > >>>>>>> for > >>>>>>>>>>>>> every single class. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Chad > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com > > > >>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Chad, > >>>>>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams > >>>>>>> internal > >>>>>>>>>>>> code > >>>>>>>>>>>>>> that reads records for the join? > >>>>>>>>>>>>>> --> You can check StreamThread#pollPhase, where stream > thread > >>>>>>> (main > >>>>>>>>>>>>>> consumer) periodically poll records. And then, it'll process > >>>> each > >>>>>>>>>>>> topology > >>>>>>>>>>>>>> node with these polled records in stream tasks > >>>>>>> (StreamTask#process). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hope that helps. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>> Luke > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart > >>>>>>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what > you’re > >>>>>>> seeing > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Gilles > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler < > >>>>>>> chad.preis...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have a stream application that does a KStream to KTable > >>>> left > >>>>>>>>> join. > >>>>>>>>>>>> We > >>>>>>>>>>>>>>>> seem to be occasionally missing joins > >>>>>>> < > >>>> > https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g > >>>>> > >>>>>>> (KTable side is null). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams > >>>>>>> internal > >>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>> that reads records for the join? I've poked around the > Kafka > >>>>>>> code > >>>>>>>>>>>> base, > >>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>> there is a lot there. I imagine there is some consumer > poll > >>>> for > >>>>>>>>> each > >>>>>>>>>>>>>> side > >>>>>>>>>>>>>>>> of the join, and possibly a background thread for reading > >>>> the > >>>>>>>>> KTable > >>>>>>>>>>>>>>> topic. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I figure there are several possible causes of this issue, > >>>> and > >>>>>>> since > >>>>>>>>>>>>>>> nothing > >>>>>>>>>>>>>>>> is really jumping out in my code, I was going to start > >>>> looking > >>>>>>> at > >>>>>>>>>>> the > >>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>> code to see if there is something I can do to fix this. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Chad > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Funding Circle Limited is authorised and regulated by the > >>>>>>> Financial > >>>>>>>>>>>>>>> Conduct Authority under firm registration number 722513. > >>>> Funding > >>>>>>>>>>> Circle > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>> not covered by the Financial Services Compensation Scheme. > >>>>>>>>> Registered > >>>>>>>>>>>> in > >>>>>>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 > Queen > >>>>>>>>>>> Victoria > >>>>>>>>>>>>>>> Street, London EC4V 4AY. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > > >