Hi Kenn,

Thanks for the reply, that makes sense.
As far as I can tell, the DirectPipelineRunner doesn't do this optimisation
(when I test the pipeline locally) but I guess the DataflowRunner will.

Josh

On Tue, Jun 20, 2017 at 4:26 PM, Kenneth Knowles <k...@google.com> wrote:

> Hi Josh,
>
> Exactly what is stored technically depends on optimization decisions by
> the runner. But you can generally expect that only the accumulator is
> stored across trigger firings, not the input elements.
>
> Kenn
>
> On Tue, Jun 20, 2017 at 6:32 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a question about how much state is buffered when using
>> Combine.perKey with a custom accumulator. For example, I have:
>>
>> PCollection<KV<String, String>> elements = ...;
>>
>> PCollection<KV<String, List<String>> topValuesPerKey = elements
>>
>> .apply(Window.into(new GlobalWindows())
>>
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>> ElementInPane()
>>
>> .plusDelayOf(Duration.standardSeconds(10))))
>>
>> .accumulatingFiredPanes())
>>
>> .apply(Combine.perKey(new MyCombineFunction()));
>>
>>
>> Here MyCombineFunction is for each key, counting the occurrences of each
>> value. It's output for each key is a List<String> of the values that occur
>> most frequently. In this case the accumulator for each key just stores a
>> Map<String, Long> of values and their associated counts.
>>
>>
>> My question is - since I am accumulatingFiredPanes forever on the global
>> window - is every element going to be buffered forever (i.e. amount of
>> space needed will constantly increase)? Or, is the amount of state buffered
>> determined by my accumulator (i.e. determined by the number of unique
>> values across all keys)? If the former is the case, how can I optimise my
>> job so that the accumulator is the only state stored across panes?
>>
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>

Reply via email to