Srikanth, Note that the same thread maybe used for fetching both the "semi-static" KTable stream as well as the continuous KStream stream, so sleep-on-no-match may not work.
I think setting timestamps for this KTable to make sure its values is smaller than the KStream stream will work, and there is a JIRA open for better handling logics: https://issues.apache.org/jira/browse/KAFKA-3478 Guozhang On Mon, May 23, 2016 at 1:58 PM, Srikanth <srikanth...@gmail.com> wrote: > Thanks Guozhang & Matthias! > > For 1), it is going to be a common ask. So a DSL API will be good. > > For 2), source for the KTable currently is a file. Think about it as a dump > from a DB table. > We are thinking of ways to stream updates from this table. But for now its > a new file every day or so. > I plan to automate file uploader to write to kafka when it gets a new file. > File is too big to be "broadcasted". The delta changes between files should > be really small though. > > Now, "current content" can be modeled based on timestamp. If I add a > timestamp field when pushing to kafka. > The records themselves have no notion of time. Its just metadata that will > be useful in join. > > Another way is similar to what Matthias suggested. I can make it sleep if a > key is not found in KTable. > I can treat it as a condition to indicated KTable is still being > initialized. Of course, I need a way to break this sleep cycle if key never > comes. > > Or this can be implemented with a custom watermark assigner that knows when > to emit a "special watermark" to indicate current content is read. > > Or for such a slow stream, any poll to kafka broker that returns zero > records can be treated as reaching end of current content. > > > Matthias, > I haven't spent enough time on the approach you outlined. Will let you > know. > > Srikanth > > > > On Mon, May 23, 2016 at 1:40 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Hi Srikanth, > > > > as Guozhang mentioned, the problem is the definition of the time, when > > your table is read for joining with the stream. > > > > Using transform() you would basically read a changlog-stream within your > > custom Transformer class and apply it via KStream.transform() to your > > regular stream. (ie, your Transformer class has a member KTable). > > > > If Transformer.transform() is called you need to decide somehow, if you > > table is read for joining or not (and "sleep" if it is not ready yet, > > effectively stalling your KStream). > > > > As some point in time (not sure how you wanna decide when this point in > > time actually is -- see beginning of this mail) you can start to process > > the data from the regular stream. > > > > Have a look into KStreamKTableLeftJoin to get an idea how this would > work. > > > > > > -Matthias > > > > > > On 05/23/2016 06:25 PM, Guozhang Wang wrote: > > > Hi Srikanth, > > > > > > How do you define if the "KTable is read completely" to get to the > > "current > > > content"? Since as you said that table is not purely static, but still > > with > > > maybe-low-traffic update streams, I guess "catch up to current content" > > is > > > still depending on some timestamp? > > > > > > BTW about 1), we are consider adding "read-only global state" as well > > into > > > the DSL in the future, but like Matthias said it won't be available in > > > 0.10.0, so you need to do it through the transform() call where you can > > > provide any customized processor. > > > > > > > > > Guozhang > > > > > > > > > On Mon, May 23, 2016 at 8:59 AM, Srikanth <srikanth...@gmail.com> > wrote: > > > > > >> Matthias, > > >> > > >> For (2), how do you achieve this using transform()? > > >> > > >> Thanks, > > >> Srikanth > > >> > > >> On Sat, May 21, 2016 at 9:10 AM, Matthias J. Sax < > matth...@confluent.io > > > > > >> wrote: > > >> > > >>> Hi Srikanth, > > >>> > > >>> 1) there is no support on DSL level, but if you use Processor API you > > >>> can do "anything" you like. So yes, a map-like transform() that gets > > >>> initialized with the "broadcast-side" of the join should work. > > >>> > > >>> 2) Right now, there is no way to stall a stream -- a custom > > >>> TimestampExtractor will not do the tricker either, because Kafka > > Streams > > >>> allows for out-of-order/late arriving data -- thus, it processes what > > is > > >>> available without "waiting" for late data... > > >>> > > >>> Of course, you could build a custom solution via transfrom() again. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> On 05/21/2016 05:24 AM, Srikanth wrote: > > >>>> Hello, > > >>>> > > >>>> I'm writing a workflow using kafka streams where an incoming stream > > >> needs > > >>>> to be denormalized and joined with a few dimension table. > > >>>> It will be written back to another kafka topic. Fairly typical I > > >> believe. > > >>>> > > >>>> 1) Can I do broadcast join if my dimension table is small enough to > be > > >>> held > > >>>> in each processor's local state? > > >>>> Not doing this will force KStream to be shuffled with an internal > > >> topic. > > >>>> If not should I write a transformer that reads the table in init() > and > > >>>> cache's it. I can then do a map like transformation in transform() > > with > > >>>> cache lookup instead of a join. > > >>>> > > >>>> > > >>>> 2) When joining a KStream withe KTable for bigger tables, how do I > > >> stall > > >>>> reading KStream until KTable is completely materialized? > > >>>> I know completely read is a very loose term in stream processing :-) > > >> The > > >>>> KTable is going to be fairly static and needs the "current content" > to > > >> be > > >>>> read completely before it can be used for joins. > > >>>> The only option for synchronizing streams I see is > TimestampExtractor. > > >> I > > >>>> can't figure out how to use that because > > >>>> > > >>>> i) The join between KStream and KTable is time agnostic and > doesn't > > >>>> really fit into a window operation. > > >>>> ii) TimestampExtractor is set as a stream config at global level. > > >>>> Timestamp extraction logic on the other hand will be specific to > each > > >>>> stream. > > >>>> How does one write a generic extractor? > > >>>> > > >>>> Thanks, > > >>>> Srikanth > > >>>> > > >>> > > >>> > > >> > > > > > > > > > > > > > > -- -- Guozhang