Hello Josh,

We are aware that the Transformer / Processor can be improved, for example
the punctuate() function should be able to return the same typed value R
for Transformer.

As for now, in your case you can return a sentinel from transform, and add
a "filter" right after it removing sentinel values.

Guozhang


On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jos...@gmail.com> wrote:

> Thank you, Guozhang! In my exploration, I did overlook the "transform"
> method; this looks promising.
>
> I could still use a little more help: I'm confused because for this
> sessionization use-case, an invocation of the 'transform' method usually
> suggests that a session is still active, so I'll have nothing to emit from
> 'transform'. Instead, I'm guessing I'll need to produce my results from the
> 'punctuate' callback. So my questions are:
>
> 1. what should I return from 'transform' to indicate that I have no output
> at this time? From my reading of 'KStreamTransformProcessor.process', it
> appears that "null" won't fly. Should I return a dummy KeyValue, and then
> filter that out downstream? Seems a little cumbersome, but perhaps not
> terrible as an interim solution... Is there a better way?
> 2. To emit completed aggregations in response to 'punctuate', can I just
> send them via 'context.forward'? (I'll note that this doesn't appear to
> enforce any type-safety, which could lead to maintainability issues.)
>
> Finally, I'll add that this pattern feels like it's abusing the Transformer
> SPI. The interface assumes that transformation is always 1:1, which is
> artificially limiting. I imagine some sort of generalization of this part
> of the system could improve usability. For example, both 'transform' and
> 'punctuate' might be reframed as void methods that receive a type-safe
> interface for 'context.forward'. (I have this small change drafted up
> within the kafka trunk sources, and could submit a PR if the maintainers
> are interested?)
>
> Thanks,
> -josh
>
> On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Josh,
> >
> > As of now Kafka Streams does not yet support session windows as in the
> > Dataflow model, though we do have near term plans to support it.
> >
> > As for now you can still work around it with the Processor, by calling
> > "KStream.transform()" function, which can still return you a stream
> object.
> > In your customized "Transofmer" implementation, you can attach a state
> > store of your own and access it in the "transform" function, and only
> > return the results, for example, when one session has ended.
> >
> > As a concrete example, Confluent has some internal tools that uses Kafka
> > Streams already for some online operations, where a sessioned window
> > processor are needed as well. We use the "transform" function in the
> > Streams DSL (i.e. "KStreamBuilder") in the following sketch:
> >
> > --------------
> >
> > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
> > "store-name");
> >
> > stream1 = builder.stream("source-topic");
> >
> > stream2.transform(MyTransformerFunc, "store-name");
> >
> > --------------
> >
> > then in MyTransformerFunc:
> >
> > public void init(ProcessorContext context) {
> >           this.kvStore = context.getStateStore("store-name");
> >
> >
> >            // now you can access this store in the transform function.
> > }
> >
> > --------------
> >
> >
> > Hope this helps.
> >
> > Guozhang
> >
> > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com>
> wrote:
> >
> > > Hello there,
> > >
> > > I've been experimenting with the Kafka Streams preview, and I'm excited
> > > about its features and capabilities! My team is enthusiastic about the
> > > lightweight operational profile, and the support for local state is
> very
> > > compelling.
> > >
> > > However, I'm having trouble designing a solution with KStreams to
> > satisfy a
> > > particular use-case: we want to "Sessionize" a stream of events, by
> > > gathering together inputs that share a common identifier and occur
> > without
> > > a configurable interruption (gap) in event-time.
> > >
> > > This is achievable with other streaming frameworks (eg, using
> > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState
> with
> > > its "timeout" capability), but I don't see how to approach it with the
> > > current Kafka Streams API.
> > >
> > > I've investigated using the aggregateWithKey function, but this doesn't
> > > appear to support data-driven windowing. I've also considered using a
> > > custom Processor to perform the aggregation, but don't see how to take
> an
> > > output-stream from a Processor and continue to work with it. This area
> of
> > > the system is undocumented, so I'm not sure how to proceed.
> > >
> > > Am I missing something? Do you have any suggestions?
> > >
> > > -josh
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to