Thank you for your response and the links to the presentations.

*However, this seems tobe orthogonal to your issue?*

Yes. From what I see in the code it looks like you have a single consumer
subscribed to multiple topics. Please correct me if I'm wrong.


*By default, timestamp synchronization is disabled. Maybeenabling it would
help?*

We are using a timestamp extractor that returns 0. We did that because we
were almost always missing joins on startup, and this seemed to be the only
way to bootstrap enough records at startup to avoid the missed join. We
found a post that said doing that would make the KTable act like the
GlobalKTable at startup. So far this works great, we never miss a join on a
startup. If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?

I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records. Later when we put the same
record on the topic and the KTable has had a chance to load more records
the join works and everything is good to go. Because of the way our system
works no new status records have been written and so the new record joins
against the correct status.

Do you agree that the poll request is returning the KStream record but not
returning the KTable record and therefore the join is getting missed? If
you don't agree, what do you think is going on? Is there a way to prove
this out?

Thanks,
Chad

On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax <mj...@apache.org> wrote:

> Yes, a StreamThread has one consumer. The number of StreamThreads per
> instance is configurable via `num.stream.threads`. Partitions are
> assigned to threads similar to consumer is a plain consumer group.
>
> It seems you run with the default of one thread per instance. As you
> spin up 12 instances, it results in 12 threads for the application. As
> you have 12 partitions, using more threads won't be useful as no
> partitions are left for them to process.
>
> For a stream-table joins, there will be one task per "partition pair"
> that computes the join for those partitions. So you get 12 tasks, and
> each thread processes one task in your setup. Ie, a thread consumer is
> reading data for both input topics.
>
> Pausing happens on a per-partition bases: for joins there is two buffers
> per task (one for each input topic partition). It's possible that one
> partition is paused while the other is processed. However, this seems to
> be orthogonal to your issue?
>
> For a GlobalKTable, you get an additional GlobalThread that only reads
> the data from the "global topic" to update the GlobalKTable. Semantics
> of KStream-KTable and KStream-GlobalKTable joins are different: Cf
>
> https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/
>
> For the timestamp synchronization, you may checkout `max.task.idle.ms`
> config. By default, timestamp synchronization is disabled. Maybe
> enabling it would help?
>
> You may also check out slides 34-38:
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>
> There is one corner case: if two records with the same timestamp come
> it, it's not defined which one will be processed first.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 10/30/21 6:45 AM, Chad Preisler wrote:
> > Yes, this helped. I have some additional questions.
> >
> > Does StreamThread have one consumer? (Looks like it, but just want to
> > confirm)
> > Is there a separate StreamThread for each topic including the KTable?
> > If a KTable is a StreamThread and there is a  StreamTask for that KTable,
> > could my buffer be getting filled up, and the mainConsumer for the KTable
> > be getting paused? I see this code in StreamTask#addRecords.
> >
> > // if after adding these records, its partition queue's buffered size has
> > been
> >          // increased beyond the threshold, we can then pause the
> > consumption for this partition
> >          if (newQueueSize > maxBufferedSize) {
> >              mainConsumer.pause(singleton(partition));
> >          }
> >
> > Is there any specific logging that I can set to debug or trace that would
> > help me troubleshoot? I'd prefer not to turn debug and/or trace on for
> > every single class.
> >
> > Thanks,
> > Chad
> >
> >
> >
> >
> >
> > On Sat, Oct 30, 2021 at 5:20 AM Luke Chen <show...@gmail.com> wrote:
> >
> >> Hi Chad,
> >>> I'm wondering if someone can point me to the Kafka streams internal
> code
> >> that reads records for the join?
> >> --> You can check StreamThread#pollPhase, where stream thread (main
> >> consumer) periodically poll records. And then, it'll process each
> topology
> >> node with these polled records in stream tasks (StreamTask#process).
> >>
> >> Hope that helps.
> >>
> >> Thanks.
> >> Luke
> >>
> >>
> >> On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
> >> <gilles.philipp...@fundingcircle.com.invalid> wrote:
> >>
> >>> Hi Chad, this talk around 24:00 clearly explains what you’re seeing
> >>>
> >>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>> <
> >>>
> >>
> https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/
> >>>>
> >>>
> >>> Gilles
> >>>
> >>>> On 30 Oct 2021, at 04:02, Chad Preisler <chad.preis...@gmail.com>
> >> wrote:
> >>>>
> >>>> Hello,
> >>>>
> >>>> I have a stream application that does a KStream to KTable left join.
> We
> >>>> seem to be occasionally missing joins (KTable side is null).
> >>>>
> >>>> I'm wondering if someone can point me to the Kafka streams internal
> >> code
> >>>> that reads records for the join? I've poked around the Kafka code
> base,
> >>> but
> >>>> there is a lot there. I imagine there is some consumer poll for each
> >> side
> >>>> of the join, and possibly a background thread for reading the KTable
> >>> topic.
> >>>>
> >>>> I figure there are several possible causes of this issue, and since
> >>> nothing
> >>>> is really jumping out in my code, I was going to start looking at the
> >>> Kafka
> >>>> code to see if there is something I can do to fix this.
> >>>>
> >>>> Thanks,
> >>>> Chad
> >>>
> >>>
> >>> --
> >>>
> >>>
> >>>
> >>>
> >>> Funding Circle Limited is authorised and regulated by the Financial
> >>> Conduct Authority under firm registration number 722513. Funding Circle
> >> is
> >>> not covered by the Financial Services Compensation Scheme. Registered
> in
> >>> England (Co. No. 06968588) with registered office at 71 Queen Victoria
> >>> Street, London EC4V 4AY.
> >>>
> >>
> >
>

Reply via email to