Hi Beam-users,

*TL;DR;* I wonder if DirectRunner does any fusion optimization
<https://beam.apache.org/contribute/ptransform-style-guide/#performance>
and whether this has any impact on triggers/panes?

*Details* (the context for everything below is *DirectRunner* and this is a
*batch* job):
I have a batch pipeline that roughly looks like this: S1->S2->S3

S1: Create URLs (from DB)
S2: Fetch those URLs (output of S1) and create Avro records
S3: Write those records to Parquet files

S2 and S3 can be fused to generate Parquet files while the records are
fetched/created. However, it does not seem to be the case, because there is
no [temp] file while the resources are being fetched and the writer log
messages appear only after all fetches are done.

If I add a trigger to the output PCollection of S2 (i.e., `records` below),
then I get intermediate Parquet output:
```
records.apply(Window.<T> into(new GlobalWindows())
       .triggering(Repeatedly.forever(
           AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5))))
       .discardingFiredPanes());
```

However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
prints some log messages for each record and passes the record to output,
then it seems S2 and S2' are fused. Because the log messages are
interleaved with fetches.

*Q1*: Does DirectRunner do any fusion optimization (e.g., like
DataflowRunner)? If not by default, is there any way to enable it?

The other issue is with triggers and creating panes. I have an extended
version of this pipeline where a simplified view of it is:
S1->S2A->GBK->S2B->S3

S1: Like before
S2A: Add a key to the output of S1
GBK: Groups output of S2A to remove duplicate keys
S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
S3: Same as before

*Q2*: In this case, if I add a dummy S2B' after S2', the log messages are
*not* interleaved with resource fetches, i.e., no fusion is happening. Why?
What is different here?

*Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
file generation does not start until all of the fetches are done. Again,
what is different here and why intermediate panes are not fired while the
output of S2B is being generated?

Thanks

-B
P.S. I need this pipeline to work both on a distributed runner and also on
a local machine with many cores. That's why the performance of DirectRunner
is important to me.

Reply via email to