Thank you, Guozhang! In my exploration, I did overlook the "transform" method; this looks promising.
I could still use a little more help: I'm confused because for this sessionization use-case, an invocation of the 'transform' method usually suggests that a session is still active, so I'll have nothing to emit from 'transform'. Instead, I'm guessing I'll need to produce my results from the 'punctuate' callback. So my questions are: 1. what should I return from 'transform' to indicate that I have no output at this time? From my reading of 'KStreamTransformProcessor.process', it appears that "null" won't fly. Should I return a dummy KeyValue, and then filter that out downstream? Seems a little cumbersome, but perhaps not terrible as an interim solution... Is there a better way? 2. To emit completed aggregations in response to 'punctuate', can I just send them via 'context.forward'? (I'll note that this doesn't appear to enforce any type-safety, which could lead to maintainability issues.) Finally, I'll add that this pattern feels like it's abusing the Transformer SPI. The interface assumes that transformation is always 1:1, which is artificially limiting. I imagine some sort of generalization of this part of the system could improve usability. For example, both 'transform' and 'punctuate' might be reframed as void methods that receive a type-safe interface for 'context.forward'. (I have this small change drafted up within the kafka trunk sources, and could submit a PR if the maintainers are interested?) Thanks, -josh On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Josh, > > As of now Kafka Streams does not yet support session windows as in the > Dataflow model, though we do have near term plans to support it. > > As for now you can still work around it with the Processor, by calling > "KStream.transform()" function, which can still return you a stream object. > In your customized "Transofmer" implementation, you can attach a state > store of your own and access it in the "transform" function, and only > return the results, for example, when one session has ended. > > As a concrete example, Confluent has some internal tools that uses Kafka > Streams already for some online operations, where a sessioned window > processor are needed as well. We use the "transform" function in the > Streams DSL (i.e. "KStreamBuilder") in the following sketch: > > -------------- > > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/, > "store-name"); > > stream1 = builder.stream("source-topic"); > > stream2.transform(MyTransformerFunc, "store-name"); > > -------------- > > then in MyTransformerFunc: > > public void init(ProcessorContext context) { > this.kvStore = context.getStateStore("store-name"); > > > // now you can access this store in the transform function. > } > > -------------- > > > Hope this helps. > > Guozhang > > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com> wrote: > > > Hello there, > > > > I've been experimenting with the Kafka Streams preview, and I'm excited > > about its features and capabilities! My team is enthusiastic about the > > lightweight operational profile, and the support for local state is very > > compelling. > > > > However, I'm having trouble designing a solution with KStreams to > satisfy a > > particular use-case: we want to "Sessionize" a stream of events, by > > gathering together inputs that share a common identifier and occur > without > > a configurable interruption (gap) in event-time. > > > > This is achievable with other streaming frameworks (eg, using > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState with > > its "timeout" capability), but I don't see how to approach it with the > > current Kafka Streams API. > > > > I've investigated using the aggregateWithKey function, but this doesn't > > appear to support data-driven windowing. I've also considered using a > > custom Processor to perform the aggregation, but don't see how to take an > > output-stream from a Processor and continue to work with it. This area of > > the system is undocumented, so I'm not sure how to proceed. > > > > Am I missing something? Do you have any suggestions? > > > > -josh > > > > > > -- > -- Guozhang >