Re: Understanding combiner's distribution logic

2020-07-01 Thread Robert Bradshaw
On Tue, Jun 30, 2020 at 3:32 PM Julien Phalip  wrote:

> Thanks Luke!
>
> One part I'm still a bit unclear about is how exactly the PreCombine stage
> works. In particular, I'm wondering how it can perform the combination
> before the GBK. Is it because it can already compute the combination on
> adjacent elements that happen to share the same key?
>

Yes, exactly. Even non-adjacent elements within the same bundle (e.g. a
runner might instruct one worker to process a the first half of a file, and
another worker the second half; these two halves would each be a "bundle").

Basically what happens is a "DoFn" gets inserted right before the GBK that,
upon receipt of a KV, buffers elements by key in a hashtable in case it
sees the key again and can do some pre-emptive combining, and then emits
them all in finishing. Essentially.

public void processElement(KV element) {
  if (this.buffer.contains(element.getKey())) {
// merge element.value with this.buffer.get(element.getValue())
  } else {
this.buffer.put(element.getKey(), element.getValue());  well,
really creates a new acumulator here
  }
}

public void finishBundle(Context context) {
  for (kv : this.buffer.entrySet()) {
context.output(kv);
  }
}

(The actual logic is a bit more complicated to avoid unbounded growth in
the table, etc.)

Could you also clarify the term "lifting" in this context? Does that refer
> to the act of pushing a partial combination before the GBK?
>

Lifting is just the terminology used to designate that when the graph looks
like

... -> (GroupByKey -> Combine) -> ...

the runner may turn it into

... -> PartialCombine -> GroupByKey -> FinishCombine -> ...


> On Tue, Jun 30, 2020 at 12:34 PM Luke Cwik  wrote:
>
>> Your reasoning is correct around the withHotkeyFanout hint and it is to
>> help runners know that there is likely one or more keys that will have
>> significantly more data then the others but the logic around how it is
>> broken up is runner dependent and whether they rely on the hint or not is
>> also runner dependent. If a runner was smart enough, it wouldn't need the
>> hint and could automatically detect hotkeys and do the right thing. I would
>> take a look at this doc[1] to learn about how the optimization can work
>> from a runners perspective. Some runners never perform the PreCombine,
>> while others may have multiple rounds of it but the most common case is
>> that there is only a single PreCombine (assuming it is allowed).
>>
>> 1: https://s.apache.org/beam-runner-api-combine-model
>>
>> On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip  wrote:
>>
>>> Hi,
>>>
>>> I had a question about how combiners work, particularly on how the
>>> combined PCollection's subsets are initially formed.
>>>
>>> I understand that, according to the documentation
>>> , a
>>> combiner allows parallelizing the computation to multiple workers by
>>> breaking up the PCollection into subsets. I like the database analogy given
>>> in this post
>>> ,
>>> which says that it is similar to pushing down a predicate.
>>>
>>> I also understand that it is possible to use withFanout or
>>> withHotkeyFanout to provide some explicit logic as a hint on how to
>>> manage the distribution.
>>>
>>> What is unclear to me, however, is whether by default the runner already
>>> plans the distribution of the computation, even when no explicit hints are
>>> provided. I'm guessing perhaps it always breaks up the PCollection into
>>> bundles
>>> 
>>> (similar to DoFns), then the combiner runs the combination on each bundle,
>>> saves the result into intermediary accumulators, and those results then
>>> bubble up recursively to the top? If that's the case, then I assume that
>>> the purpose of withFanout and withHotKeyFanout is to further break up
>>> those initially pre-created bundles into even smaller subsets? Or am I
>>> guessing this wrong? :)
>>>
>>> I couldn't find a clear description in the documentation on how the
>>> PCollection subsets are initially formed. Please let me know if you have
>>> some details on that, or if it is already documented somewhere.
>>>
>>> Thank you!
>>>
>>> Julien
>>>
>>


Watermark lag for GenerateSequence on Dataflow is always ~40 minutes

2020-07-01 Thread Marcin Kuthan
Hi

My processing pipeline utilizes GenerateSequence transform to query
BigQuery periodically.
It works, the query is performed exactly with rate defined by
GenerateSequence but the watermark is always ~40 minutes behind generated
timestamps.

The code looks as follows (Beam: 2.19, Scio: 0.8.4):

val sc: ScioContext = ...

val startTime = Instant.now()
val interval = Duration.standardMinutes(10)

val sequence = sc.customInput(
  GenerateSequence.from(1)
.withRate(1, interval)
.withTimestampFn(i => startTime.plus(interval.multipliedBy(i
  .withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES))
  .withTimestamp
  .map { case (_, timestamp) => timestamp }

val finalStream = sequence.flatMap { timestamp => // load data from BQ, the
timestamps from the sequence are preserved }

I'm looking for the reason for the GenerateSequence watermark lag when the
code is running on Dataflow runner. Any clue how to debug the issue further?

Marcin