Hello Josh, We are aware that the Transformer / Processor can be improved, for example the punctuate() function should be able to return the same typed value R for Transformer.
As for now, in your case you can return a sentinel from transform, and add a "filter" right after it removing sentinel values. Guozhang On Wed, Mar 23, 2016 at 7:02 PM, josh gruenberg <jos...@gmail.com> wrote: > Thank you, Guozhang! In my exploration, I did overlook the "transform" > method; this looks promising. > > I could still use a little more help: I'm confused because for this > sessionization use-case, an invocation of the 'transform' method usually > suggests that a session is still active, so I'll have nothing to emit from > 'transform'. Instead, I'm guessing I'll need to produce my results from the > 'punctuate' callback. So my questions are: > > 1. what should I return from 'transform' to indicate that I have no output > at this time? From my reading of 'KStreamTransformProcessor.process', it > appears that "null" won't fly. Should I return a dummy KeyValue, and then > filter that out downstream? Seems a little cumbersome, but perhaps not > terrible as an interim solution... Is there a better way? > 2. To emit completed aggregations in response to 'punctuate', can I just > send them via 'context.forward'? (I'll note that this doesn't appear to > enforce any type-safety, which could lead to maintainability issues.) > > Finally, I'll add that this pattern feels like it's abusing the Transformer > SPI. The interface assumes that transformation is always 1:1, which is > artificially limiting. I imagine some sort of generalization of this part > of the system could improve usability. For example, both 'transform' and > 'punctuate' might be reframed as void methods that receive a type-safe > interface for 'context.forward'. (I have this small change drafted up > within the kafka trunk sources, and could submit a PR if the maintainers > are interested?) > > Thanks, > -josh > > On Wed, Mar 23, 2016 at 11:02 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Josh, > > > > As of now Kafka Streams does not yet support session windows as in the > > Dataflow model, though we do have near term plans to support it. > > > > As for now you can still work around it with the Processor, by calling > > "KStream.transform()" function, which can still return you a stream > object. > > In your customized "Transofmer" implementation, you can attach a state > > store of your own and access it in the "transform" function, and only > > return the results, for example, when one session has ended. > > > > As a concrete example, Confluent has some internal tools that uses Kafka > > Streams already for some online operations, where a sessioned window > > processor are needed as well. We use the "transform" function in the > > Streams DSL (i.e. "KStreamBuilder") in the following sketch: > > > > -------------- > > > > builder.addStateStore(/* new RocksDBKeyValueStoreSupplier(..)*/, > > "store-name"); > > > > stream1 = builder.stream("source-topic"); > > > > stream2.transform(MyTransformerFunc, "store-name"); > > > > -------------- > > > > then in MyTransformerFunc: > > > > public void init(ProcessorContext context) { > > this.kvStore = context.getStateStore("store-name"); > > > > > > // now you can access this store in the transform function. > > } > > > > -------------- > > > > > > Hope this helps. > > > > Guozhang > > > > On Tue, Mar 22, 2016 at 11:51 AM, josh gruenberg <jos...@gmail.com> > wrote: > > > > > Hello there, > > > > > > I've been experimenting with the Kafka Streams preview, and I'm excited > > > about its features and capabilities! My team is enthusiastic about the > > > lightweight operational profile, and the support for local state is > very > > > compelling. > > > > > > However, I'm having trouble designing a solution with KStreams to > > satisfy a > > > particular use-case: we want to "Sessionize" a stream of events, by > > > gathering together inputs that share a common identifier and occur > > without > > > a configurable interruption (gap) in event-time. > > > > > > This is achievable with other streaming frameworks (eg, using > > > Beam/Dataflow's "Session" windows, or SparkStreaming's mapWithState > with > > > its "timeout" capability), but I don't see how to approach it with the > > > current Kafka Streams API. > > > > > > I've investigated using the aggregateWithKey function, but this doesn't > > > appear to support data-driven windowing. I've also considered using a > > > custom Processor to perform the aggregation, but don't see how to take > an > > > output-stream from a Processor and continue to work with it. This area > of > > > the system is undocumented, so I'm not sure how to proceed. > > > > > > Am I missing something? Do you have any suggestions? > > > > > > -josh > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang