On Mon, Sep 5, 2016 at 12:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > for this you would have to use a non-parallel window, i.e. something like > stream.windowAll(<my window>).apply(...). This does not compute per key but > has the drawback that computation does not happen in parallel. If you only > use it to combine the pre-aggregated maps it could be OK, though. > > Cheers, > Aljoscha > hi, thanks for the tip, it works, I was aware of the non parallel nature of what I want to do, after seeing it work I tried this: input.flatMap(new LineSplitter()).keyBy(0) .timeWindow(Time.of(5, TimeUnit.SECONDS)) .apply(new HashMap<String, Integer>(), timeWindowFold, timeWindowMerge) .windowAll(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .apply(new HashMap<String, Integer>(), windowAllFold, windowAllMerge); and it seems to work, but it seems each timeWindowFold accumulates a single key, I was expecting to have one or more keys per fold depending on in which processing node the computation was being handled, I don't care if I emit one event per key, but I want to know if it's ok and if I'm missing something (maybe I have to add a line to specify partition?) > On Fri, 2 Sep 2016 at 18:26 Luis Mariano Guerra <mari...@event-fabric.com> > wrote: > >> On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> from this I would expect to get as many HashMaps as you have keys. The >>> winFunction is also executed per-key so it cannot combine the HashMaps of >>> all keys. >>> >>> Does this describe the behavior that you're seeing? >>> >> >> yes, it's the behaviour I'm seeing, I'm looking for a way to merge those >> HashMaps from the same window into a single one, I can't find how. >> >> >>> >>> Cheers, >>> Aljoscha >>> >>> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra < >>> mari...@event-fabric.com> wrote: >>> >>>> hi! >>>> >>>> I'm trying to collect some metrics by key per window and emiting the >>>> full result at the end of the window to kafka, I started with a simple >>>> count by key to test it but my requirements are a little more complex than >>>> that. >>>> >>>> what I want to do is to fold the stream events as they come and then at >>>> the end of the window merge them together and emit a singe result, I don't >>>> want to accumulate all the events and calculate at the end of the window, >>>> from my understanding of fold in other languages/libraries, this would be >>>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not >>>> working: >>>> >>>> the basic is: >>>> >>>> input >>>> .flatMap(new LineSplitter()) >>>> .keyBy(0) >>>> .timeWindow(Time.of(5, TimeUnit.SECONDS)) >>>> .apply(new HashMap<String, Integer>(), foldFunction, >>>> winFunction); >>>> >>>> where foldFunction accumulates by key and winFunction iterate over the >>>> hasmaps and merges them into a single result hashmap and emits that one at >>>> the end. >>>> >>>> this emits many one-key hash maps instead of only one with all the >>>> keys, I tried setting setParallelism(1) in multiple places but still >>>> doesn't work. More confusingly, in one run it emited a single map but after >>>> I ran it again it went back to the previous behavior. >>>> >>>> what I'm doing wrong? is there any other approach? >>>> >>>> I can provide the implementation of foldFunction and winFunction if >>>> required but I think it doesn't change much. >>>> >>>> Reading the source code I see: >>>> >>>> Applies the given window function to each window. The window >>>> function is called for each evaluation of the window for each key >>>> individually. The output of the window function is interpreted as a regular >>>> non-windowed stream. >>>> >>>> emphasis on " for each key individually", the return type of apply is >>>> SingleOutputStreamOperator which doesn't provide many operations to group >>>> the emited values. >>>> >>>> thanks in advance. >>>> >>>