The timestamp is not only used for windowing specs but also for flow
control (i.e. it is used a way of "message chooser" among multiple input
topic partitions), see this section for details:

http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps


Guozhang

On Fri, May 27, 2016 at 1:52 PM, Srikanth <srikanth...@gmail.com> wrote:

> Guozhang,
>
> Timestamp extraction seems more like a stream level API. I guess its a
> better fit as a global options when using WallclockTimestampExtractor
> or ConsumerRecordTimestampExtractor.
>
> w.r.t your statement -- "I think setting timestamps for this KTable to make
> sure its values is smaller than the KStream stream will work"
> Its still not clear to me how setting timestamp will help in this use case.
> Below is a sample code where KStream is set to current time and KTable is
> set to (currenttime-1day).
> The join itself is time agnostic and doesn't really have a window. How will
> setting earlier timestamp make KTable to be materialized before KStream?
>
>     val rawKStream = kStreamBuilder.stream(Serdes.ByteArray(),
> rawLogSerde(), rawLogTopicName)
>                                                      .filter((k,v) => v !=
> null)
>                                                      .map((k,v) => (v.id),
> v))
>
>     val metadataKTable = kStreamBuilder.table(longSerde, longSerde,
> metadataTopicName)
>
>     val joinedKStream = rawKStream.leftJoin( metadataKTable, (r:rawLog,
> clientId:JLong) => (clientId, r) )
>                                                     .map((k,v) => new
> KeyValue(v._1, v._2))
>
>
>   override def extract(record: ConsumerRecord[Object, Object]): Long = {
>     record.topic match {
>       case rawLogTopicName => System.currentTimeMillis
>       case metadataTopicName => System.currentTimeMillis - 86400000
>       case _ => System.currentTimeMillis
>     }
>   }
>
> Srikanth
>
> On Thu, May 26, 2016 at 1:01 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > A processor is guaranteed to be executed on the same thread at any given
> > time, its process() and punctuate() will always be triggered to run in a
> > single thread.
> >
> > Currently TimestampExtractor is set globally, but you can definitely
> define
> > different logics depending on the topic name (which is included in the
> > input ConsumerRecord).
> >
> > Guozhang
> >
> >
> > On Mon, May 23, 2016 at 6:20 PM, Srikanth <srikanth...@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > I guess you are referring to a scenario where noOfThreads <
> > totalNoOfTasks.
> > > We could have KTable task and KStream task running on the same thread
> and
> > > sleep will be counter productive?
> > > On this note, will a Processor always run on the same thread? Are
> > process()
> > > and punctuate() guaranteed to never run in parallel?
> > >
> > > The jira you gave seems to be on the same lines.
> > > Can you comment on my question regarding TimestampExtractor?
> > > We set one TimestampExtractor as a stream config at global level.
> > Timestamp
> > > extraction logic on the other hand will be specific to each stream.
> > >
> > > Srikanth
> > >
> > >
> > > On Mon, May 23, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Srikanth,
> > > >
> > > > Note that the same thread maybe used for fetching both the
> > "semi-static"
> > > > KTable stream as well as the continuous KStream stream, so
> > > > sleep-on-no-match may not work.
> > > >
> > > > I think setting timestamps for this KTable to make sure its values is
> > > > smaller than the KStream stream will work, and there is a JIRA open
> for
> > > > better handling logics:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3478
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, May 23, 2016 at 1:58 PM, Srikanth <srikanth...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Guozhang & Matthias!
> > > > >
> > > > > For 1), it is going to be a common ask. So a DSL API will be good.
> > > > >
> > > > > For 2), source for the KTable currently is a file. Think about it
> as
> > a
> > > > dump
> > > > > from a DB table.
> > > > > We are thinking of ways to stream updates from this table. But for
> > now
> > > > its
> > > > > a new file every day or so.
> > > > > I plan to automate file uploader to write to kafka when it gets a
> new
> > > > file.
> > > > > File is too big to be "broadcasted". The delta changes between
> files
> > > > should
> > > > > be really small though.
> > > > >
> > > > > Now, "current content" can be modeled based on timestamp. If I add
> a
> > > > > timestamp field when pushing to kafka.
> > > > > The records themselves have no notion of time. Its just metadata
> that
> > > > will
> > > > > be useful in join.
> > > > >
> > > > > Another way is similar to what Matthias suggested. I can make it
> > sleep
> > > > if a
> > > > > key is not found in KTable.
> > > > > I can treat it as a condition to indicated KTable is still being
> > > > > initialized. Of course, I need a way to break this sleep cycle if
> key
> > > > never
> > > > > comes.
> > > > >
> > > > > Or this can be implemented with a custom watermark assigner that
> > knows
> > > > when
> > > > > to emit a "special watermark" to indicate current content is read.
> > > > >
> > > > > Or for such a slow stream, any poll to kafka broker that returns
> zero
> > > > > records can be treated as reaching end of current content.
> > > > >
> > > > >
> > > > > Matthias,
> > > > > I haven't spent enough time on the approach you outlined. Will let
> > you
> > > > > know.
> > > > >
> > > > > Srikanth
> > > > >
> > > > >
> > > > >
> > > > > On Mon, May 23, 2016 at 1:40 PM, Matthias J. Sax <
> > > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Srikanth,
> > > > > >
> > > > > > as Guozhang mentioned, the problem is the definition of the time,
> > > when
> > > > > > your table is read for joining with the stream.
> > > > > >
> > > > > > Using transform() you would basically read a changlog-stream
> within
> > > > your
> > > > > > custom Transformer class and apply it via KStream.transform() to
> > your
> > > > > > regular stream. (ie, your Transformer class has a member KTable).
> > > > > >
> > > > > > If Transformer.transform() is called you need to decide somehow,
> if
> > > you
> > > > > > table is read for joining or not (and "sleep" if it is not ready
> > yet,
> > > > > > effectively stalling your KStream).
> > > > > >
> > > > > > As some point in time (not sure how you wanna decide when this
> > point
> > > in
> > > > > > time actually is -- see beginning of this mail) you can start to
> > > > process
> > > > > > the data from the regular stream.
> > > > > >
> > > > > > Have a look into KStreamKTableLeftJoin to get an idea how this
> > would
> > > > > work.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > > On 05/23/2016 06:25 PM, Guozhang Wang wrote:
> > > > > > > Hi Srikanth,
> > > > > > >
> > > > > > > How do you define if the "KTable is read completely" to get to
> > the
> > > > > > "current
> > > > > > > content"? Since as you said that table is not purely static,
> but
> > > > still
> > > > > > with
> > > > > > > maybe-low-traffic update streams, I guess "catch up to current
> > > > content"
> > > > > > is
> > > > > > > still depending on some timestamp?
> > > > > > >
> > > > > > > BTW about 1), we are consider adding "read-only global state"
> as
> > > well
> > > > > > into
> > > > > > > the DSL in the future, but like Matthias said it won't be
> > available
> > > > in
> > > > > > > 0.10.0, so you need to do it through the transform() call where
> > you
> > > > can
> > > > > > > provide any customized processor.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Mon, May 23, 2016 at 8:59 AM, Srikanth <
> srikanth...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Matthias,
> > > > > > >>
> > > > > > >> For (2), how do you achieve this using transform()?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Srikanth
> > > > > > >>
> > > > > > >> On Sat, May 21, 2016 at 9:10 AM, Matthias J. Sax <
> > > > > matth...@confluent.io
> > > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi Srikanth,
> > > > > > >>>
> > > > > > >>> 1) there is no support on DSL level, but if you use Processor
> > API
> > > > you
> > > > > > >>> can do "anything" you like. So yes, a map-like transform()
> that
> > > > gets
> > > > > > >>> initialized with the "broadcast-side" of the join should
> work.
> > > > > > >>>
> > > > > > >>> 2) Right now, there is no way to stall a stream -- a custom
> > > > > > >>> TimestampExtractor will not do the tricker either, because
> > Kafka
> > > > > > Streams
> > > > > > >>> allows for out-of-order/late arriving data -- thus, it
> > processes
> > > > what
> > > > > > is
> > > > > > >>> available without "waiting" for late data...
> > > > > > >>>
> > > > > > >>> Of course, you could build a custom solution via transfrom()
> > > again.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> -Matthias
> > > > > > >>>
> > > > > > >>> On 05/21/2016 05:24 AM, Srikanth wrote:
> > > > > > >>>> Hello,
> > > > > > >>>>
> > > > > > >>>> I'm writing a workflow using kafka streams where an incoming
> > > > stream
> > > > > > >> needs
> > > > > > >>>> to be denormalized and joined with a few dimension table.
> > > > > > >>>> It will be written back to another kafka topic. Fairly
> > typical I
> > > > > > >> believe.
> > > > > > >>>>
> > > > > > >>>> 1) Can I do broadcast join if my dimension table is small
> > enough
> > > > to
> > > > > be
> > > > > > >>> held
> > > > > > >>>> in each processor's local state?
> > > > > > >>>> Not doing this will force KStream to be shuffled with an
> > > internal
> > > > > > >> topic.
> > > > > > >>>> If not should I write a transformer that reads the table in
> > > init()
> > > > > and
> > > > > > >>>> cache's it. I can then do a map like transformation in
> > > transform()
> > > > > > with
> > > > > > >>>> cache lookup instead of a join.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> 2) When joining a KStream withe KTable for bigger tables,
> how
> > > do I
> > > > > > >> stall
> > > > > > >>>> reading KStream until KTable is completely materialized?
> > > > > > >>>> I know completely read is a very loose term in stream
> > processing
> > > > :-)
> > > > > > >> The
> > > > > > >>>> KTable is going to be fairly static and needs the "current
> > > > content"
> > > > > to
> > > > > > >> be
> > > > > > >>>> read completely before it can be used for joins.
> > > > > > >>>> The only option for synchronizing streams I see is
> > > > > TimestampExtractor.
> > > > > > >> I
> > > > > > >>>> can't figure out how to use that because
> > > > > > >>>>
> > > > > > >>>>   i) The join between KStream and KTable is time agnostic
> and
> > > > > doesn't
> > > > > > >>>> really fit into a window operation.
> > > > > > >>>>   ii) TimestampExtractor is set as a stream config at global
> > > > level.
> > > > > > >>>> Timestamp extraction logic on the other hand will be
> specific
> > to
> > > > > each
> > > > > > >>>> stream.
> > > > > > >>>>       How does one write a generic extractor?
> > > > > > >>>>
> > > > > > >>>> Thanks,
> > > > > > >>>> Srikanth
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to