Hi Guozhang,

"I think ideally what you want is a windowed KStream-KTable join" <- It is
what I need indeed. But from what I gather in Kafka/Confluent
documentation, when using a KStream-KTable, you don't have the window
parameter. @Matthias said it by design it seems, maybe there's a good
reason for this. But for my use case with a KStream-KTable, I need to be
sure about getting the changelog record (KTable) key/id first before it's
used in the record stream (KStream) else my result joined record stream
will contain record that doesn't have the changelog record (null or not up
to date on join). I'm trying to understand and find if there's a way to do
something like this with the current release of Kafka.

Regards,

On Wed, Jul 20, 2016 at 7:47 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Nicolas,
>
> For KStream-KTable join, if a record coming from the KStream did not find
> the matching record from the other materialized KTable, then the join
> result is lost since as you noted, even if the changelog record arrives to
> KTable later, it will not trigger a join as the KStream is not
> materialized. Using the early timestamps on KTable has the effects of
> trying to first process records from KTable and materialize them before
> processing records from KStream so that it is likely that the corresponding
> matching key has already exist in KTable when the joining record from
> KStream arrives and being processed. But again, it is not perfectly
> guaranteed since this is best-effort.
>
> I think ideally what you want is a windowed KStream-KTable join, where the
> KTable's key space is bounded and hence can be completely materialized
> (across partitions if you run multiple instances of your same piece of
> code), whereas the KStream is unbounded and hence you need to window it in
> order to materialize it so that if there is a late record from the KTable,
> it may still find the matching record from the windowed KStream. Is that
> right?
>
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 4:25 AM, Nicolas PHUNG <nsphung.apa...@gmail.com>
> wrote:
>
> > @Guozhang Ok I've tried and it doesn't have the expected behavior. For
> > KStream-KStream join, there's the issue to have to produce the same
> > changelog record to be able to join within the windows. And for
> > KStream-KTable, an update/insert in the changelog record doesn't trigger
> > join missed that was in the record stream (+you can't specify a windows
> for
> > a KStream-KTable).
> >
> > On Wed, Jul 20, 2016 at 11:14 AM, Nicolas PHUNG <
> nsphung.apa...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > Thank you for your answer @Matthias. Indeed, I need a kind of symmetric
> > > join. However, KStream-KStream join doesn't match with my use case: I
> > will
> > > need to generate the events in the changelog (e.g a campaign marketing
> > with
> > > a certain Id/Key) because they live only for the join in a defined
> > windows.
> > > Let's say I got a click in the record stream and then the campaign
> entity
> > > arrive later on within the windows, the join enriched stream works. But
> > > after the windows, new click arrive in the record stream and won't be
> > able
> > > to find the campaign entity since the windows containing the
> information
> > > has passed. I haven't tried KTable-KTable yet but I think my clicks for
> > > example doesn't really match a changelog stream in my opinion.
> > >
> > > @Guozhang Ok so If I fake the timestamp in the record stream (KStream)
> > > (let's say add 1 day), I could manage to give one day chance to the
> late
> > > arrival in the changelog stream (my KTable) for the Join to be
> processed.
> > > Let me try. Thanks
> > >
> > > Regards,
> > >
> > > On Tue, Jul 19, 2016 at 11:45 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > >> Hello Nicolas,
> > >>
> > >> If this missing matched record issue is mainly due to the order these
> > two
> > >> streams were processed (e.g., say your corresponding changelog record
> > was
> > >> a
> > >> bit late compared with the record stream's record with the same key),
> > you
> > >> can try to "hint" Kafka Streams library to give the changelog stream a
> > bit
> > >> more time ahead by specifying its timestamps using the
> > TimestampExtractor
> > >> with an earlier value against the record stream. And Kafka Streams
> will
> > do
> > >> a best-effort "stream synchronization" to make sure these two streams
> > were
> > >> processed at roughly the same pace based on record timestamps, which
> > will
> > >> result in records from the changelog stream to be processed in-priori
> to
> > >> the record stream.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Tue, Jul 19, 2016 at 6:10 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > >> wrote:
> > >>
> > >> > Hi Nicolas,
> > >> >
> > >> > your are right, it is currently not possible to get a result from a
> > >> > KTable update (and this is by design). The idea is, that the KStream
> > is
> > >> > enriched with the *current state* of KTable -- thus, for each
> KStream
> > >> > record a look-up in KTable is done. (In this sense, a KStream-KTable
> > >> > join in asymmetric.)
> > >> >
> > >> > If you need a symmetric join (ie, lookup for both directions), you
> can
> > >> > either use a KTable-KTable or KStream-KStream join. Not sure, if
> this
> > >> > might work for your use case.
> > >> >
> > >> > -Matthias
> > >> >
> > >> >
> > >> > On 07/19/2016 01:36 PM, Nicolas PHUNG wrote:
> > >> > > Hi,
> > >> > >
> > >> > > I'm using Kafka 0.10.0.0 with the Confluent platform 3.0.0
> > >> > >
> > >> > > I manage to join a record stream (KStream / clicks stream) with a
> > >> > changelog
> > >> > > stream (KTable / an entity like a campaign related to a click for
> > >> > example).
> > >> > > When the entity in the KTable is inserted first (and the first
> time
> > of
> > >> > > course) in Kafka, the record stream is processed as expected with
> > the
> > >> > join
> > >> > > in a new enriched stream. This is good.
> > >> > >
> > >> > > My issue is when the record stream generate a record that
> contains a
> > >> > key/id
> > >> > > that hasn't been insert yet in the changelog stream/KTable. My
> > process
> > >> > > generate a record stream without information in the enriched
> stream.
> > >> > Would
> > >> > > it be possible to recall this enriched stream process once the
> > >> changelog
> > >> > > record on my KTable received the missing id/key ? From my
> > >> understanding,
> > >> > > it's not possible right now to this with a KStream-KTable join. Is
> > >> there
> > >> > a
> > >> > > way to do something like this ?
> > >> > >
> > >> > > Thanks.
> > >> > >
> > >> > > Regards,
> > >> > > Nicolas PHUNG
> > >> > >
> > >> >
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to