Hi Kenn, Thanks for the reply, that makes sense. As far as I can tell, the DirectPipelineRunner doesn't do this optimisation (when I test the pipeline locally) but I guess the DataflowRunner will.
Josh On Tue, Jun 20, 2017 at 4:26 PM, Kenneth Knowles <k...@google.com> wrote: > Hi Josh, > > Exactly what is stored technically depends on optimization decisions by > the runner. But you can generally expect that only the accumulator is > stored across trigger firings, not the input elements. > > Kenn > > On Tue, Jun 20, 2017 at 6:32 AM, Josh <jof...@gmail.com> wrote: > >> Hi all, >> >> I have a question about how much state is buffered when using >> Combine.perKey with a custom accumulator. For example, I have: >> >> PCollection<KV<String, String>> elements = ...; >> >> PCollection<KV<String, List<String>> topValuesPerKey = elements >> >> .apply(Window.into(new GlobalWindows()) >> >> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst >> ElementInPane() >> >> .plusDelayOf(Duration.standardSeconds(10)))) >> >> .accumulatingFiredPanes()) >> >> .apply(Combine.perKey(new MyCombineFunction())); >> >> >> Here MyCombineFunction is for each key, counting the occurrences of each >> value. It's output for each key is a List<String> of the values that occur >> most frequently. In this case the accumulator for each key just stores a >> Map<String, Long> of values and their associated counts. >> >> >> My question is - since I am accumulatingFiredPanes forever on the global >> window - is every element going to be buffered forever (i.e. amount of >> space needed will constantly increase)? Or, is the amount of state buffered >> determined by my accumulator (i.e. determined by the number of unique >> values across all keys)? If the former is the case, how can I optimise my >> job so that the accumulator is the only state stored across panes? >> >> >> Thanks for any advice, >> >> Josh >> > >