hi!

I'm trying to collect some metrics by key per window and emiting the full
result at the end of the window to kafka, I started with a simple count by
key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the
end of the window merge them together and emit a singe result, I don't want
to accumulate all the events and calculate at the end of the window, from
my understanding of fold in other languages/libraries, this would be what I
need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

    input
                .flatMap(new LineSplitter())
                .keyBy(0)
                .timeWindow(Time.of(5, TimeUnit.SECONDS))
                .apply(new HashMap<String, Integer>(), foldFunction,
winFunction);

where foldFunction accumulates by key and winFunction iterate over the
hasmaps and merges them into a single result hashmap and emits that one at
the end.

this emits many one-key hash maps instead of only one with all the keys, I
tried setting setParallelism(1) in multiple places but still doesn't work.
More confusingly, in one run it emited a single map but after I ran it
again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if
required but I think it doesn't change much.

Reading the source code I see:

    Applies the given window function to each window. The window function
is called for each evaluation of the window for each key individually. The
output of the window function is interpreted as a regular non-windowed
stream.

emphasis on " for each key individually", the return type of apply is
SingleOutputStreamOperator which doesn't provide many operations to group
the emited values.

thanks in advance.

Reply via email to