I'm not sure. When I ran with trace logging turned on I saw a bunch of
messages like the ones below. Do those messages indicate
"enforced-processing"? It gets logged right after the call
to enforcedProcessingSensor.record.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [status-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {event-5=1635881287722}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [event-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {status-5=1635881272754}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.

On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax <mj...@apache.org> wrote:

> Can you check if the program ever does "enforced processing", ie,
> `max.task.idle.ms` passed, and we process despite an empty input buffer.
>
> Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring
>
> As long as there is input data, we should never do "enforced processing"
> and the metric should stay at zero.
>
>
> -Matthias
>
> On 11/3/21 2:41 PM, Chad Preisler wrote:
> > Just a quick update. Setting max.task.idle.ms to 10000 (10 seconds) had
> no
> > effect on this issue.
> >
> > On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <chad.preis...@gmail.com>
> > wrote:
> >
> >> No unfortunately it is not the case. The table record is written about
> 20
> >> seconds before the stream record. I’ll crank up the time tomorrow and
> see
> >> what happens.
> >>
> >> On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> Hard to tell, but as it seems that you can reproduce the issue, it
> might
> >>> be worth a try to increase the idle time further.
> >>>
> >>> I guess one corner case for stream-table join that is not resolved yet
> >>> is when stream and table record have the same timestamp... For this
> >>> case, the table record might not be processed first.
> >>>
> >>> Could you hit this case?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 11/2/21 3:13 PM, Chad Preisler wrote:
> >>>> Thank you for the information. We are using the Kafka 3.0 client
> >>> library.
> >>>> We are able to reliably reproduce this issue in our test environment
> >>> now. I
> >>>> removed my timestamp extractor, and I set the max.task.idle.ms to
> >>> 2000. I
> >>>> also turned on trace logging for package
> >>>> org.apache.kafka.streams.processor.internals.
> >>>>
> >>>> To create the issue we stopped the application and ran enough data to
> >>>> create a lag of 400 messages. We saw 5 missed joins.
> >>>>
> >>>>   From the stream-thread log messages we saw the event message, our
> >>> stream
> >>>> missed the join, and then several milliseconds later we saw the
> >>>> stream-thread print out the status message. The stream-thread printed
> >>> out
> >>>> our status message a total of 5 times.
> >>>>
> >>>> Given that only a few milliseconds passed between missing the join and
> >>> the
> >>>> stream-thread printing the status message, would increasing the
> >>>> max.task.idle.ms help?
> >>>>
> >>>> Thanks,
> >>>> Chad
> >>>>
> >>>> On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Timestamp synchronization is not perfect, and as a matter of fact, we
> >>>>> fixed a few gaps in 3.0.0 release. We actually hope, that we closed
> the
> >>>>> last gaps in 3.0.0... *fingers-crossed* :)
> >>>>>
> >>>>>> We are using a timestamp extractor that returns 0.
> >>>>>
> >>>>> You can do this, and it effectively "disables" timestamp
> >>> synchronization
> >>>>> as records on the KTable side don't have a timeline any longer. As a
> >>>>> side effect it also allows you to "bootstrap" the table, as records
> >>> with
> >>>>> timestamp zero will always be processed first (as they are smaller).
> Of
> >>>>> course, you also don't have time synchronization for "future" data
> and
> >>>>> your program becomes non-deterministic if you reprocess old data.
> >>>>>
> >>>>>> his seemed to be the only
> >>>>>> way to bootstrap enough records at startup to avoid the missed join.
> >>>>>
> >>>>> Using 3.0.0 and enabling timestamp synchronization via
> >>>>> `max.task.idle.ms` config, should allow you to get the correct
> >>> behavior
> >>>>> without the zero-extractor (of course, your KTable data must have
> >>>>> smaller timestamps that your KStream data).
> >>>>>
> >>>>>> If I use "timestamp synchronization" do I have to remove the zero
> >>>>>> timestamp extractor? If I remove the zero timestamp extractor will
> >>>>>> timestamp synchronization take care of the missed join issue on
> >>> startup?
> >>>>>
> >>>>> To be more precise: timestamp synchronization is _always_ on. The
> >>>>> question is just how strict it is applied. By default, we do the
> >>> weakest
> >>>>> from which is only best effort.
> >>>>>
> >>>>>> I'm guessing the issue here is that occasionally the poll request is
> >>> not
> >>>>>> returning the matching record for the KTable side of the join before
> >>> the
> >>>>>> task goes off and starts processing records.
> >>>>>
> >>>>> Yes, because of default best effort approach. That is why you should
> >>>>> increase `max.task.idle.ms` to detect this case and "skip"
> processing
> >>>>> and let KS do another poll() to get KTable data.
> >>>>>
> >>>>> 2.8 and earlier:
> >>>>>
> >>>>> max.task.idle.ms=0 -> best effort (no poll() retry)
> >>>>> max.task.idle.ms>0 -> try to do another poll() until data is there
> or
> >>>>> idle time passed
> >>>>>
> >>>>> Note: >0 might still "fail" even if there is data, because consumer
> >>>>> fetch behavior is not predictable.
> >>>>>
> >>>>>
> >>>>> 3.0:
> >>>>>
> >>>>> max.task.idle.ms=-1 -> best effort (no poll() retry)
> >>>>> max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
> >>>>> until you get the data
> >>>>> max.task.idle.ms>0 -> even if there is not data broker side, wait
> >>> until
> >>>>> data becomes available or the idle time passed
> >>>>>
> >>>>>
> >>>>> Hope this helps.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 11/1/21 4:29 PM, Guozhang Wang wrote:
> >>>>>> Hello Chad,
> >>>>>>
> >>>>>>    From your earlier comment, you mentioned "In my scenario the
> records
> >>>>> were
> >>>>>> written to the KTable topic before the record was written to the
> >>> KStream
> >>>>>> topic." So I think Matthias and others have excluded this
> possibility
> >>>>> while
> >>>>>> trying to help investigate.
> >>>>>>
> >>>>>> If only the matching records from KStream are returned via a single
> a
> >>>>>> consumer poll call but not the other records from KTable, then you
> >>> would
> >>>>>> miss this matched join result.
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler <
> >>> chad.preis...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thank you for your response and the links to the presentations.
> >>>>>>>
> >>>>>>>
> >>>>>>> *However, this seems tobe orthogonal to your issue?*
> >>>>>>>
> >>>>>>> Yes. From what I see in the code it looks like you have a single
> >>>>> consumer
> >>>>>>> subscribed to multiple topics. Please correct me if I'm wrong.
> >>>>>>>
> >>>>>>>
> >>>>>>> *By default, timestamp synchronization is disabled. Maybeenabling
> it
> >>>>> would
> >>>>>>> help?*
> >>>>>>>
> >>>>>>> We are using a timestamp extractor that returns 0. We did that
> >>> because
> >>>>> we
> >>>>>>> were almost always missing joins on startup, and this seemed to be
> >>> the
> >>>>> only
> >>>>>>> way to bootstrap enough records at startup to avoid the missed
> join.
> >>> We
> >>>>>>> found a post that said doing that would make the KTable act like
> the
> >>>>>>> GlobalKTable at startup. So far this works great, we never miss a
> >>> join
> >>>>> on a
> >>>>>>> startup. If I use "timestamp synchronization" do I have to remove
> the
> >>>>> zero
> >>>>>>> timestamp extractor? If I remove the zero timestamp extractor will
> >>>>>>> timestamp synchronization take care of the missed join issue on
> >>> startup?
> >>>>>>>
> >>>>>>> I'm guessing the issue here is that occasionally the poll request
> is
> >>> not
> >>>>>>> returning the matching record for the KTable side of the join
> before
> >>> the
> >>>>>>> task goes off and starts processing records. Later when we put the
> >>> same
> >>>>>>> record on the topic and the KTable has had a chance to load more
> >>> records
> >>>>>>> the join works and everything is good to go. Because of the way our
> >>>>> system
> >>>>>>> works no new status records have been written and so the new record
> >>>>> joins
> >>>>>>> against the correct status.
> >>>>>>>
> >>>>>>> Do you agree that the poll request is returning the KStream record
> >>> but
> >>>>> not
> >>>>>>> returning the KTable record and therefore the join is getting
> >>> missed? If
> >>>>>>> you don't agree, what do you think is going on? Is there a way to
> >>> prove
> >>>>>>> this out?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Chad
> >>>>>>>
> >>>>>>> On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Yes, a StreamThread has one consumer. The number of StreamThreads
> >>> per
> >>>>>>>> instance is configurable via `num.stream.threads`. Partitions are
> >>>>>>>> assigned to threads similar to consumer is a plain consumer group.
> >>>>>>>>
> >>>>>>>> It seems you run with the default of one thread per instance. As
> you
> >>>>>>>> spin up 12 instances, it results in 12 threads for the
> application.
> >>> As
> >>>>>>>> you have 12 partitions, using more threads won't be useful as no
> >>>>>>>> partitions are left for them to process.
> >>>>>>>>
> >>>>>>>> For a stream-table joins, there will be one task per "partition
> >>> pair"
> >>>>>>>> that computes the join for those partitions. So you get 12 tasks,
> >>> and
> >>>>>>>> each thread processes one task in your setup. Ie, a thread
> consumer
> >>> is
> >>>>>>>> reading data for both input topics.
> >>>>>>>>
> >>>>>>>> Pausing happens on a per-partition bases: for joins there is two
> >>>>> buffers
> >>>>>>>> per task (one for each input topic partition). It's possible that
> >>> one
> >>>>>>>> partition is paused while the other is processed. However, this
> >>> seems
> >>>>> to
> >>>>>>>> be orthogonal to your issue?
> >>>>>>>>
> >>>>>>>> For a GlobalKTable, you get an additional GlobalThread that only
> >>> reads
> >>>>>>>> the data from the "global topic" to update the GlobalKTable.
> >>> Semantics
> >>>>>>>> of KStream-KTable and KStream-GlobalKTable joins are different: Cf
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
> >>>>>>>>
> >>>>>>>> For the timestamp synchronization, you may checkout `
> >>> max.task.idle.ms`
> >>>>>>>> config. By default, timestamp synchronization is disabled. Maybe
> >>>>>>>> enabling it would help?
> >>>>>>>>
> >>>>>>>> You may also check out slides 34-38:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
> >>>>>>>>
> >>>>>>>> There is one corner case: if two records with the same timestamp
> >>> come
> >>>>>>>> it, it's not defined which one will be processed first.
> >>>>>>>>
> >>>>>>>> Hope this helps.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/30/21 6:45 AM, Chad Preisler wrote:
> >>>>>>>>> Yes, this helped. I have some additional questions.
> >>>>>>>>>
> >>>>>>>>> Does StreamThread have one consumer? (Looks like it, but just
> want
> >>> to
> >>>>>>>>> confirm)
> >>>>>>>>> Is there a separate StreamThread for each topic including the
> >>> KTable?
> >>>>>>>>> If a KTable is a StreamThread and there is a  StreamTask for that
> >>>>>>> KTable,
> >>>>>>>>> could my buffer be getting filled up, and the mainConsumer for
> the
> >>>>>>> KTable
> >>>>>>>>> be getting paused? I see this code in StreamTask#addRecords.
> >>>>>>>>>
> >>>>>>>>> // if after adding these records, its partition queue's buffered
> >>> size
> >>>>>>> has
> >>>>>>>>> been
> >>>>>>>>>             // increased beyond the threshold, we can then pause
> the
> >>>>>>>>> consumption for this partition
> >>>>>>>>>             if (newQueueSize > maxBufferedSize) {
> >>>>>>>>>                 mainConsumer.pause(singleton(partition));
> >>>>>>>>>             }
> >>>>>>>>>
> >>>>>>>>> Is there any specific logging that I can set to debug or trace
> that
> >>>>>>> would
> >>>>>>>>> help me troubleshoot? I'd prefer not to turn debug and/or trace
> on
> >>> for
> >>>>>>>>> every single class.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Chad
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com>
> >>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Chad,
> >>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
> >>> internal
> >>>>>>>> code
> >>>>>>>>>> that reads records for the join?
> >>>>>>>>>> --> You can check StreamThread#pollPhase, where stream thread
> >>> (main
> >>>>>>>>>> consumer) periodically poll records. And then, it'll process
> each
> >>>>>>>> topology
> >>>>>>>>>> node with these polled records in stream tasks
> >>> (StreamTask#process).
> >>>>>>>>>>
> >>>>>>>>>> Hope that helps.
> >>>>>>>>>>
> >>>>>>>>>> Thanks.
> >>>>>>>>>> Luke
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
> >>>>>>>>>> <gilles.philipp...@fundingcircle.com.invalid> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Chad, this talk around 24:00 clearly explains what you’re
> >>> seeing
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>>>>>>>>>> <
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Gilles
> >>>>>>>>>>>
> >>>>>>>>>>>> On 30 Oct 2021, at 04:02, Chad Preisler <
> >>> chad.preis...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hello,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have a stream application that does a KStream to KTable left
> >>>>> join.
> >>>>>>>> We
> >>>>>>>>>>>> seem to be occasionally missing joins
> >>> <
> https://www.google.com/maps/search/%3E%3E%3E%3E%3E+seem+to+be+occasionally+missing+joins?entry=gmail&source=g
> >
> >>> (KTable side is null).
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm wondering if someone can point me to the Kafka streams
> >>> internal
> >>>>>>>>>> code
> >>>>>>>>>>>> that reads records for the join? I've poked around the Kafka
> >>> code
> >>>>>>>> base,
> >>>>>>>>>>> but
> >>>>>>>>>>>> there is a lot there. I imagine there is some consumer poll
> for
> >>>>> each
> >>>>>>>>>> side
> >>>>>>>>>>>> of the join, and possibly a background thread for reading the
> >>>>> KTable
> >>>>>>>>>>> topic.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I figure there are several possible causes of this issue, and
> >>> since
> >>>>>>>>>>> nothing
> >>>>>>>>>>>> is really jumping out in my code, I was going to start looking
> >>> at
> >>>>>>> the
> >>>>>>>>>>> Kafka
> >>>>>>>>>>>> code to see if there is something I can do to fix this.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Chad
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Funding Circle Limited is authorised and regulated by the
> >>> Financial
> >>>>>>>>>>> Conduct Authority under firm registration number 722513.
> Funding
> >>>>>>> Circle
> >>>>>>>>>> is
> >>>>>>>>>>> not covered by the Financial Services Compensation Scheme.
> >>>>> Registered
> >>>>>>>> in
> >>>>>>>>>>> England (Co. No. 06968588) with registered office at 71 Queen
> >>>>>>> Victoria
> >>>>>>>>>>> Street, London EC4V 4AY.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to