Guozhang,

I guess you are referring to a scenario where noOfThreads < totalNoOfTasks.
We could have KTable task and KStream task running on the same thread and
sleep will be counter productive?
On this note, will a Processor always run on the same thread? Are process()
and punctuate() guaranteed to never run in parallel?

The jira you gave seems to be on the same lines.
Can you comment on my question regarding TimestampExtractor?
We set one TimestampExtractor as a stream config at global level. Timestamp
extraction logic on the other hand will be specific to each stream.

Srikanth


On Mon, May 23, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Srikanth,
>
> Note that the same thread maybe used for fetching both the "semi-static"
> KTable stream as well as the continuous KStream stream, so
> sleep-on-no-match may not work.
>
> I think setting timestamps for this KTable to make sure its values is
> smaller than the KStream stream will work, and there is a JIRA open for
> better handling logics:
>
> https://issues.apache.org/jira/browse/KAFKA-3478
>
>
> Guozhang
>
>
> On Mon, May 23, 2016 at 1:58 PM, Srikanth <srikanth...@gmail.com> wrote:
>
> > Thanks Guozhang & Matthias!
> >
> > For 1), it is going to be a common ask. So a DSL API will be good.
> >
> > For 2), source for the KTable currently is a file. Think about it as a
> dump
> > from a DB table.
> > We are thinking of ways to stream updates from this table. But for now
> its
> > a new file every day or so.
> > I plan to automate file uploader to write to kafka when it gets a new
> file.
> > File is too big to be "broadcasted". The delta changes between files
> should
> > be really small though.
> >
> > Now, "current content" can be modeled based on timestamp. If I add a
> > timestamp field when pushing to kafka.
> > The records themselves have no notion of time. Its just metadata that
> will
> > be useful in join.
> >
> > Another way is similar to what Matthias suggested. I can make it sleep
> if a
> > key is not found in KTable.
> > I can treat it as a condition to indicated KTable is still being
> > initialized. Of course, I need a way to break this sleep cycle if key
> never
> > comes.
> >
> > Or this can be implemented with a custom watermark assigner that knows
> when
> > to emit a "special watermark" to indicate current content is read.
> >
> > Or for such a slow stream, any poll to kafka broker that returns zero
> > records can be treated as reaching end of current content.
> >
> >
> > Matthias,
> > I haven't spent enough time on the approach you outlined. Will let you
> > know.
> >
> > Srikanth
> >
> >
> >
> > On Mon, May 23, 2016 at 1:40 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Hi Srikanth,
> > >
> > > as Guozhang mentioned, the problem is the definition of the time, when
> > > your table is read for joining with the stream.
> > >
> > > Using transform() you would basically read a changlog-stream within
> your
> > > custom Transformer class and apply it via KStream.transform() to your
> > > regular stream. (ie, your Transformer class has a member KTable).
> > >
> > > If Transformer.transform() is called you need to decide somehow, if you
> > > table is read for joining or not (and "sleep" if it is not ready yet,
> > > effectively stalling your KStream).
> > >
> > > As some point in time (not sure how you wanna decide when this point in
> > > time actually is -- see beginning of this mail) you can start to
> process
> > > the data from the regular stream.
> > >
> > > Have a look into KStreamKTableLeftJoin to get an idea how this would
> > work.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 05/23/2016 06:25 PM, Guozhang Wang wrote:
> > > > Hi Srikanth,
> > > >
> > > > How do you define if the "KTable is read completely" to get to the
> > > "current
> > > > content"? Since as you said that table is not purely static, but
> still
> > > with
> > > > maybe-low-traffic update streams, I guess "catch up to current
> content"
> > > is
> > > > still depending on some timestamp?
> > > >
> > > > BTW about 1), we are consider adding "read-only global state" as well
> > > into
> > > > the DSL in the future, but like Matthias said it won't be available
> in
> > > > 0.10.0, so you need to do it through the transform() call where you
> can
> > > > provide any customized processor.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, May 23, 2016 at 8:59 AM, Srikanth <srikanth...@gmail.com>
> > wrote:
> > > >
> > > >> Matthias,
> > > >>
> > > >> For (2), how do you achieve this using transform()?
> > > >>
> > > >> Thanks,
> > > >> Srikanth
> > > >>
> > > >> On Sat, May 21, 2016 at 9:10 AM, Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >>> Hi Srikanth,
> > > >>>
> > > >>> 1) there is no support on DSL level, but if you use Processor API
> you
> > > >>> can do "anything" you like. So yes, a map-like transform() that
> gets
> > > >>> initialized with the "broadcast-side" of the join should work.
> > > >>>
> > > >>> 2) Right now, there is no way to stall a stream -- a custom
> > > >>> TimestampExtractor will not do the tricker either, because Kafka
> > > Streams
> > > >>> allows for out-of-order/late arriving data -- thus, it processes
> what
> > > is
> > > >>> available without "waiting" for late data...
> > > >>>
> > > >>> Of course, you could build a custom solution via transfrom() again.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 05/21/2016 05:24 AM, Srikanth wrote:
> > > >>>> Hello,
> > > >>>>
> > > >>>> I'm writing a workflow using kafka streams where an incoming
> stream
> > > >> needs
> > > >>>> to be denormalized and joined with a few dimension table.
> > > >>>> It will be written back to another kafka topic. Fairly typical I
> > > >> believe.
> > > >>>>
> > > >>>> 1) Can I do broadcast join if my dimension table is small enough
> to
> > be
> > > >>> held
> > > >>>> in each processor's local state?
> > > >>>> Not doing this will force KStream to be shuffled with an internal
> > > >> topic.
> > > >>>> If not should I write a transformer that reads the table in init()
> > and
> > > >>>> cache's it. I can then do a map like transformation in transform()
> > > with
> > > >>>> cache lookup instead of a join.
> > > >>>>
> > > >>>>
> > > >>>> 2) When joining a KStream withe KTable for bigger tables, how do I
> > > >> stall
> > > >>>> reading KStream until KTable is completely materialized?
> > > >>>> I know completely read is a very loose term in stream processing
> :-)
> > > >> The
> > > >>>> KTable is going to be fairly static and needs the "current
> content"
> > to
> > > >> be
> > > >>>> read completely before it can be used for joins.
> > > >>>> The only option for synchronizing streams I see is
> > TimestampExtractor.
> > > >> I
> > > >>>> can't figure out how to use that because
> > > >>>>
> > > >>>>   i) The join between KStream and KTable is time agnostic and
> > doesn't
> > > >>>> really fit into a window operation.
> > > >>>>   ii) TimestampExtractor is set as a stream config at global
> level.
> > > >>>> Timestamp extraction logic on the other hand will be specific to
> > each
> > > >>>> stream.
> > > >>>>       How does one write a generic extractor?
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Srikanth
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > > >
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to