Thank you, Guozhang! In my exploration, I did overlook the "transform"
method; this looks promising.

I could still use a little more help: I'm confused because for this
sessionization use-case, an invocation of the 'transform' method usually
suggests that a session is still active, so I'll have nothing to emit from
'transform'. Instead, I'm guessing I'll need to produce my results from the
'punctuate' callback. So my questions are:

1. what should I return from 'transform' to indicate that I have no output
at this time? From my reading of 'KStreamTransformProcessor.process', it
appears that "null" won't fly. Should I return a dummy KeyValue, and then
filter that out downstream? Seems a little cumbersome, but perhaps not
terrible as an interim solution... Is there a better way?
2. To emit completed aggregations in response to 'punctuate', can I just
send them via 'context.forward'? (I'll note that this doesn't appear to
enforce any type-safety, which could lead to maintainability issues.)

Finally, I'll add that this pattern feels like it's abusing the Transformer
SPI. The interface assumes that transformation is always 1:1, which is
artificially limiting. I imagine some sort of generalization of this part
of the system could improve usability. For example, both 'transform' and
'punctuate' might be reframed as void methods that receive a type-safe
interface for 'context.forward'. (I have this small change drafted up
within the kafka trunk sources, and could submit a PR if the maintainers
are interested?)

Thanks,
-josh

On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Josh,
>
> As of now Kafka Streams does not yet support session windows as in the
> Dataflow model, though we do have near term plans to support it.
>
> As for now you can still work around it with the Processor, by calling
> "KStream.transform()" function, which can still return you a stream object.
> In your customized "Transofmer" implementation, you can attach a state
> store of your own and access it in the "transform" function, and only
> return the results, for example, when one session has ended.
>
> As a concrete example, Confluent has some internal tools that uses Kafka
> Streams already for some online operations, where a sessioned window
> processor are needed as well. We use the "transform" function in the
> Streams DSL (i.e. "KStreamBuilder") in the following sketch:
>
> --------------
>
> builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/,
> "store-name");
>
> stream1 = builder.stream("source-topic");
>
> stream2.transform(MyTransformerFunc, "store-name");
>
> --------------
>
> then in MyTransformerFunc:
>
> public void init(ProcessorContext context) {
>           this.kvStore = context.getStateStore("store-name");
>
>
>            // now you can access this store in the transform function.
> }
>
> --------------
>
>
> Hope this helps.
>
> Guozhang
>
> On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com> wrote:
>
> > Hello there,
> >
> > I've been experimenting with the Kafka Streams preview, and I'm excited
> > about its features and capabilities! My team is enthusiastic about the
> > lightweight operational profile, and the support for local state is very
> > compelling.
> >
> > However, I'm having trouble designing a solution with KStreams to
> satisfy a
> > particular use-case: we want to "Sessionize" a stream of events, by
> > gathering together inputs that share a common identifier and occur
> without
> > a configurable interruption (gap) in event-time.
> >
> > This is achievable with other streaming frameworks (eg, using
> > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState with
> > its "timeout" capability), but I don't see how to approach it with the
> > current Kafka Streams API.
> >
> > I've investigated using the aggregateWithKey function, but this doesn't
> > appear to support data-driven windowing. I've also considered using a
> > custom Processor to perform the aggregation, but don't see how to take an
> > output-stream from a Processor and continue to work with it. This area of
> > the system is undocumented, so I'm not sure how to proceed.
> >
> > Am I missing something? Do you have any suggestions?
> >
> > -josh
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to