I'm not sure. When I ran with trace logging turned on I saw a bunch of messages like the ones below. Do those messages indicate "enforced-processing"? It gets logged right after the call to enforcedProcessingSensor.record.
Continuing to process although some partitions are empty on the broker. There may be out-of-order processing for this task as a result. Partitions with local data: [status-5]. Partitions we gave up waiting for, with their corresponding deadlines: {event-5=1635881287722}. Configured max.task.idle.ms: 2000. Current wall-clock time: 1635881287750. Continuing to process although some partitions are empty on the broker. There may be out-of-order processing for this task as a result. Partitions with local data: [event-5]. Partitions we gave up waiting for, with their corresponding deadlines: {status-5=1635881272754}. Configured max.task.idle.ms: 2000. Current wall-clock time: 1635881277998. On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> wrote: > Can you check if the program ever does "enforced processing", ie, > `max.task.idle.ms` passed, and we process despite an empty input buffer. > > Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring > > As long as there is input data, we should never do "enforced processing" > and the metric should stay at zero. > > > -Matthias > > On 11/3/21 2:41 PM, Chad Preisler wrote: > > Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) had > no > > effect on this issue. > > > > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com> > > wrote: > > > >> No unfortunately it is not the case. The table record is written about > 20 > >> seconds before the stream record. I’ll crank up the time tomorrow and > see > >> what happens. > >> > >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org> > wrote: > >> > >>> Hard to tell, but as it seems that you can reproduce the issue, it > might > >>> be worth a try to increase the idle time further. > >>> > >>> I guess one corner case for stream-table join that is not resolved yet > >>> is when stream and table record have the same timestamp... For this > >>> case, the table record might not be processed first. > >>> > >>> Could you hit this case? > >>> > >>> > >>> -Matthias > >>> > >>> On 11/2/21 3:13 PM, Chad Preisler wrote: > >>>> Thank you for the information. We are using the Kafka 3.0 client > >>> library. > >>>> We are able to reliably reproduce this issue in our test environment > >>> now. I > >>>> removed my timestamp extractor, and I set the max.task.idle.ms to > >>> 2000. I > >>>> also turned on trace logging for package > >>>> org.apache.kafka.streams.processor.internals. > >>>> > >>>> To create the issue we stopped the application and ran enough data to > >>>> create a lag of 400 messages. We saw 5 missed joins. > >>>> > >>>> From the stream-thread log messages we saw the event message, our > >>> stream > >>>> missed the join, and then several milliseconds later we saw the > >>>> stream-thread print out the status message. The stream-thread printed > >>> out > >>>> our status message a total of 5 times. > >>>> > >>>> Given that only a few milliseconds passed between missing the join and > >>> the > >>>> stream-thread printing the status message, would increasing the > >>>> max.task.idle.ms help? > >>>> > >>>> Thanks, > >>>> Chad > >>>> > >>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org> > >>> wrote: > >>>> > >>>>> Timestamp synchronization is not perfect, and as a matter of fact, we > >>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we closed > the > >>>>> last gaps in 3.0.0... *fingers-crossed* :) > >>>>> > >>>>>> We are using a timestamp extractor that returns 0. > >>>>> > >>>>> You can do this, and it effectively "disables" timestamp > >>> synchronization > >>>>> as records on the KTable side don't have a timeline any longer. As a > >>>>> side effect it also allows you to "bootstrap" the table, as records > >>> with > >>>>> timestamp zero will always be processed first (as they are smaller). > Of > >>>>> course, you also don't have time synchronization for "future" data > and > >>>>> your program becomes non-deterministic if you reprocess old data. > >>>>> > >>>>>> his seemed to be the only > >>>>>> way to bootstrap enough records at startup to avoid the missed join. > >>>>> > >>>>> Using 3.0.0 and enabling timestamp synchronization via > >>>>> `max.task.idle.ms` config, should allow you to get the correct > >>> behavior > >>>>> without the zero-extractor (of course, your KTable data must have > >>>>> smaller timestamps that your KStream data). > >>>>> > >>>>>> If I use "timestamp synchronization" do I have to remove the zero > >>>>>> timestamp extractor? If I remove the zero timestamp extractor will > >>>>>> timestamp synchronization take care of the missed join issue on > >>> startup? > >>>>> > >>>>> To be more precise: timestamp synchronization is _always_ on. The > >>>>> question is just how strict it is applied. By default, we do the > >>> weakest > >>>>> from which is only best effort. > >>>>> > >>>>>> I'm guessing the issue here is that occasionally the poll request is > >>> not > >>>>>> returning the matching record for the KTable side of the join before > >>> the > >>>>>> task goes off and starts processing records. > >>>>> > >>>>> Yes, because of default best effort approach. That is why you should > >>>>> increase `max.task.idle.ms` to detect this case and "skip" > processing > >>>>> and let KS do another poll() to get KTable data. > >>>>> > >>>>> 2.8 and earlier: > >>>>> > >>>>> max.task.idle.ms=0 -> best effort (no poll() retry) > >>>>> max.task.idle.ms>0 -> try to do another poll() until data is there > or > >>>>> idle time passed > >>>>> > >>>>> Note: >0 might still "fail" even if there is data, because consumer > >>>>> fetch behavior is not predictable. > >>>>> > >>>>> > >>>>> 3.0: > >>>>> > >>>>> max.task.idle.ms=-1 -> best effort (no poll() retry) > >>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to poll() > >>>>> until you get the data > >>>>> max.task.idle.ms>0 -> even if there is not data broker side, wait > >>> until > >>>>> data becomes available or the idle time passed > >>>>> > >>>>> > >>>>> Hope this helps. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote: > >>>>>> Hello Chad, > >>>>>> > >>>>>> From your earlier comment, you mentioned "In my scenario the > records > >>>>> were > >>>>>> written to the KTable topic before the record was written to the > >>> KStream > >>>>>> topic." So I think Matthias and others have excluded this > possibility > >>>>> while > >>>>>> trying to help investigate. > >>>>>> > >>>>>> If only the matching records from KStream are returned via a single > a > >>>>>> consumer poll call but not the other records from KTable, then you > >>> would > >>>>>> miss this matched join result. > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler < > >>> chad.preis...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Thank you for your response and the links to the presentations. > >>>>>>> > >>>>>>> > >>>>>>> *However, this seems tobe orthogonal to your issue?* > >>>>>>> > >>>>>>> Yes. From what I see in the code it looks like you have a single > >>>>> consumer > >>>>>>> subscribed to multiple topics. Please correct me if I'm wrong. > >>>>>>> > >>>>>>> > >>>>>>> *By default, timestamp synchronization is disabled. Maybeenabling > it > >>>>> would > >>>>>>> help?* > >>>>>>> > >>>>>>> We are using a timestamp extractor that returns 0. We did that > >>> because > >>>>> we > >>>>>>> were almost always missing joins on startup, and this seemed to be > >>> the > >>>>> only > >>>>>>> way to bootstrap enough records at startup to avoid the missed > join. > >>> We > >>>>>>> found a post that said doing that would make the KTable act like > the > >>>>>>> GlobalKTable at startup. So far this works great, we never miss a > >>> join > >>>>> on a > >>>>>>> startup. If I use "timestamp synchronization" do I have to remove > the > >>>>> zero > >>>>>>> timestamp extractor? If I remove the zero timestamp extractor will > >>>>>>> timestamp synchronization take care of the missed join issue on > >>> startup? > >>>>>>> > >>>>>>> I'm guessing the issue here is that occasionally the poll request > is > >>> not > >>>>>>> returning the matching record for the KTable side of the join > before > >>> the > >>>>>>> task goes off and starts processing records. Later when we put the > >>> same > >>>>>>> record on the topic and the KTable has had a chance to load more > >>> records > >>>>>>> the join works and everything is good to go. Because of the way our > >>>>> system > >>>>>>> works no new status records have been written and so the new record > >>>>> joins > >>>>>>> against the correct status. > >>>>>>> > >>>>>>> Do you agree that the poll request is returning the KStream record > >>> but > >>>>> not > >>>>>>> returning the KTable record and therefore the join is getting > >>> missed? If > >>>>>>> you don't agree, what do you think is going on? Is there a way to > >>> prove > >>>>>>> this out? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Chad > >>>>>>> > >>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>>>> > >>>>>>>> Yes, a StreamThread has one consumer. The number of StreamThreads > >>> per > >>>>>>>> instance is configurable via `num.stream.threads`. Partitions are > >>>>>>>> assigned to threads similar to consumer is a plain consumer group. > >>>>>>>> > >>>>>>>> It seems you run with the default of one thread per instance. As > you > >>>>>>>> spin up 12 instances, it results in 12 threads for the > application. > >>> As > >>>>>>>> you have 12 partitions, using more threads won't be useful as no > >>>>>>>> partitions are left for them to process. > >>>>>>>> > >>>>>>>> For a stream-table joins, there will be one task per "partition > >>> pair" > >>>>>>>> that computes the join for those partitions. So you get 12 tasks, > >>> and > >>>>>>>> each thread processes one task in your setup. Ie, a thread > consumer > >>> is > >>>>>>>> reading data for both input topics. > >>>>>>>> > >>>>>>>> Pausing happens on a per-partition bases: for joins there is two > >>>>> buffers > >>>>>>>> per task (one for each input topic partition). It's possible that > >>> one > >>>>>>>> partition is paused while the other is processed. However, this > >>> seems > >>>>> to > >>>>>>>> be orthogonal to your issue? > >>>>>>>> > >>>>>>>> For a GlobalKTable, you get an additional GlobalThread that only > >>> reads > >>>>>>>> the data from the "global topic" to update the GlobalKTable. > >>> Semantics > >>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/ > >>>>>>>> > >>>>>>>> For the timestamp synchronization, you may checkout ` > >>> max.task.idle.ms` > >>>>>>>> config. By default, timestamp synchronization is disabled. Maybe > >>>>>>>> enabling it would help? > >>>>>>>> > >>>>>>>> You may also check out slides 34-38: > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ > >>>>>>>> > >>>>>>>> There is one corner case: if two records with the same timestamp > >>> come > >>>>>>>> it, it's not defined which one will be processed first. > >>>>>>>> > >>>>>>>> Hope this helps. > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote: > >>>>>>>>> Yes, this helped. I have some additional questions. > >>>>>>>>> > >>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just > want > >>> to > >>>>>>>>> confirm) > >>>>>>>>> Is there a separate StreamThread for each topic including the > >>> KTable? > >>>>>>>>> If a KTable is a StreamThread and there is a StreamTask for that > >>>>>>> KTable, > >>>>>>>>> could my buffer be getting filled up, and the mainConsumer for > the > >>>>>>> KTable > >>>>>>>>> be getting paused? I see this code in StreamTask#addRecords. > >>>>>>>>> > >>>>>>>>> // if after adding these records, its partition queue's buffered > >>> size > >>>>>>> has > >>>>>>>>> been > >>>>>>>>> // increased beyond the threshold, we can then pause > the > >>>>>>>>> consumption for this partition > >>>>>>>>> if (newQueueSize > maxBufferedSize) { > >>>>>>>>> mainConsumer.pause(singleton(partition)); > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> Is there any specific logging that I can set to debug or trace > that > >>>>>>> would > >>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or trace > on > >>> for > >>>>>>>>> every single class. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Chad > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> > >>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Chad, > >>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams > >>> internal > >>>>>>>> code > >>>>>>>>>> that reads records for the join? > >>>>>>>>>> --> You can check StreamThread#pollPhase, where stream thread > >>> (main > >>>>>>>>>> consumer) periodically poll records. And then, it'll process > each > >>>>>>>> topology > >>>>>>>>>> node with these polled records in stream tasks > >>> (StreamTask#process). > >>>>>>>>>> > >>>>>>>>>> Hope that helps. > >>>>>>>>>> > >>>>>>>>>> Thanks. > >>>>>>>>>> Luke > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart > >>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re > >>> seeing > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>>>>>>> < > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/ > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Gilles > >>>>>>>>>>> > >>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler < > >>> chad.preis...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hello, > >>>>>>>>>>>> > >>>>>>>>>>>> I have a stream application that does a KStream to KTable left > >>>>> join. > >>>>>>>> We > >>>>>>>>>>>> seem to be occasionally missing joins > >>> < > https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g > > > >>> (KTable side is null). > >>>>>>>>>>>> > >>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams > >>> internal > >>>>>>>>>> code > >>>>>>>>>>>> that reads records for the join? I've poked around the Kafka > >>> code > >>>>>>>> base, > >>>>>>>>>>> but > >>>>>>>>>>>> there is a lot there. I imagine there is some consumer poll > for > >>>>> each > >>>>>>>>>> side > >>>>>>>>>>>> of the join, and possibly a background thread for reading the > >>>>> KTable > >>>>>>>>>>> topic. > >>>>>>>>>>>> > >>>>>>>>>>>> I figure there are several possible causes of this issue, and > >>> since > >>>>>>>>>>> nothing > >>>>>>>>>>>> is really jumping out in my code, I was going to start looking > >>> at > >>>>>>> the > >>>>>>>>>>> Kafka > >>>>>>>>>>>> code to see if there is something I can do to fix this. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Chad > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Funding Circle Limited is authorised and regulated by the > >>> Financial > >>>>>>>>>>> Conduct Authority under firm registration number 722513. > Funding > >>>>>>> Circle > >>>>>>>>>> is > >>>>>>>>>>> not covered by the Financial Services Compensation Scheme. > >>>>> Registered > >>>>>>>> in > >>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen > >>>>>>> Victoria > >>>>>>>>>>> Street, London EC4V 4AY. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >