Srikanth,

Note that the same thread maybe used for fetching both the "semi-static"
KTable stream as well as the continuous KStream stream, so
sleep-on-no-match may not work.

I think setting timestamps for this KTable to make sure its values is
smaller than the KStream stream will work, and there is a JIRA open for
better handling logics:

https://issues.apache.org/jira/browse/KAFKA-3478


Guozhang


On Mon, May 23, 2016 at 1:58 PM, Srikanth <srikanth...@gmail.com> wrote:

> Thanks Guozhang & Matthias!
>
> For 1), it is going to be a common ask. So a DSL API will be good.
>
> For 2), source for the KTable currently is a file. Think about it as a dump
> from a DB table.
> We are thinking of ways to stream updates from this table. But for now its
> a new file every day or so.
> I plan to automate file uploader to write to kafka when it gets a new file.
> File is too big to be "broadcasted". The delta changes between files should
> be really small though.
>
> Now, "current content" can be modeled based on timestamp. If I add a
> timestamp field when pushing to kafka.
> The records themselves have no notion of time. Its just metadata that will
> be useful in join.
>
> Another way is similar to what Matthias suggested. I can make it sleep if a
> key is not found in KTable.
> I can treat it as a condition to indicated KTable is still being
> initialized. Of course, I need a way to break this sleep cycle if key never
> comes.
>
> Or this can be implemented with a custom watermark assigner that knows when
> to emit a "special watermark" to indicate current content is read.
>
> Or for such a slow stream, any poll to kafka broker that returns zero
> records can be treated as reaching end of current content.
>
>
> Matthias,
> I haven't spent enough time on the approach you outlined. Will let you
> know.
>
> Srikanth
>
>
>
> On Mon, May 23, 2016 at 1:40 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Hi Srikanth,
> >
> > as Guozhang mentioned, the problem is the definition of the time, when
> > your table is read for joining with the stream.
> >
> > Using transform() you would basically read a changlog-stream within your
> > custom Transformer class and apply it via KStream.transform() to your
> > regular stream. (ie, your Transformer class has a member KTable).
> >
> > If Transformer.transform() is called you need to decide somehow, if you
> > table is read for joining or not (and "sleep" if it is not ready yet,
> > effectively stalling your KStream).
> >
> > As some point in time (not sure how you wanna decide when this point in
> > time actually is -- see beginning of this mail) you can start to process
> > the data from the regular stream.
> >
> > Have a look into KStreamKTableLeftJoin to get an idea how this would
> work.
> >
> >
> > -Matthias
> >
> >
> > On 05/23/2016 06:25 PM, Guozhang Wang wrote:
> > > Hi Srikanth,
> > >
> > > How do you define if the "KTable is read completely" to get to the
> > "current
> > > content"? Since as you said that table is not purely static, but still
> > with
> > > maybe-low-traffic update streams, I guess "catch up to current content"
> > is
> > > still depending on some timestamp?
> > >
> > > BTW about 1), we are consider adding "read-only global state" as well
> > into
> > > the DSL in the future, but like Matthias said it won't be available in
> > > 0.10.0, so you need to do it through the transform() call where you can
> > > provide any customized processor.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, May 23, 2016 at 8:59 AM, Srikanth <srikanth...@gmail.com>
> wrote:
> > >
> > >> Matthias,
> > >>
> > >> For (2), how do you achieve this using transform()?
> > >>
> > >> Thanks,
> > >> Srikanth
> > >>
> > >> On Sat, May 21, 2016 at 9:10 AM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > >> wrote:
> > >>
> > >>> Hi Srikanth,
> > >>>
> > >>> 1) there is no support on DSL level, but if you use Processor API you
> > >>> can do "anything" you like. So yes, a map-like transform() that gets
> > >>> initialized with the "broadcast-side" of the join should work.
> > >>>
> > >>> 2) Right now, there is no way to stall a stream -- a custom
> > >>> TimestampExtractor will not do the tricker either, because Kafka
> > Streams
> > >>> allows for out-of-order/late arriving data -- thus, it processes what
> > is
> > >>> available without "waiting" for late data...
> > >>>
> > >>> Of course, you could build a custom solution via transfrom() again.
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 05/21/2016 05:24 AM, Srikanth wrote:
> > >>>> Hello,
> > >>>>
> > >>>> I'm writing a workflow using kafka streams where an incoming stream
> > >> needs
> > >>>> to be denormalized and joined with a few dimension table.
> > >>>> It will be written back to another kafka topic. Fairly typical I
> > >> believe.
> > >>>>
> > >>>> 1) Can I do broadcast join if my dimension table is small enough to
> be
> > >>> held
> > >>>> in each processor's local state?
> > >>>> Not doing this will force KStream to be shuffled with an internal
> > >> topic.
> > >>>> If not should I write a transformer that reads the table in init()
> and
> > >>>> cache's it. I can then do a map like transformation in transform()
> > with
> > >>>> cache lookup instead of a join.
> > >>>>
> > >>>>
> > >>>> 2) When joining a KStream withe KTable for bigger tables, how do I
> > >> stall
> > >>>> reading KStream until KTable is completely materialized?
> > >>>> I know completely read is a very loose term in stream processing :-)
> > >> The
> > >>>> KTable is going to be fairly static and needs the "current content"
> to
> > >> be
> > >>>> read completely before it can be used for joins.
> > >>>> The only option for synchronizing streams I see is
> TimestampExtractor.
> > >> I
> > >>>> can't figure out how to use that because
> > >>>>
> > >>>>   i) The join between KStream and KTable is time agnostic and
> doesn't
> > >>>> really fit into a window operation.
> > >>>>   ii) TimestampExtractor is set as a stream config at global level.
> > >>>> Timestamp extraction logic on the other hand will be specific to
> each
> > >>>> stream.
> > >>>>       How does one write a generic extractor?
> > >>>>
> > >>>> Thanks,
> > >>>> Srikanth
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to