Re: Extremely Slow DirectRunner

2021-05-08 Thread Ismaël Mejía
Can you try running direct runner with the option
`--experiments=use_deprecated_read`

Seems like an instance of
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
also reported in
https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E

We should rollback using the SDF wrapper by default because of the
usability and performance issues reported.


On Sat, May 8, 2021 at 12:57 AM Evan Galpin  wrote:

> Hi all,
>
> I’m experiencing very slow performance and startup delay when testing a
> pipeline locally. I’m reading data from a Google PubSub subscription as the
> data source, and before each pipeline execution I ensure that data is
> present in the subscription (readable from GCP console).
>
> I’m seeing startup delay on the order of minutes with DirectRunner (5-10
> min). Is that expected? I did find a Jira ticket[1] that at first seemed
> related, but I think it has more to do with BQ than DirectRunner.
>
> I’ve run the pipeline with a debugger connected and confirmed that it’s
> minutes before the first DoFn in my pipeline receives any data. Is there a
> way I can profile the direct runner to see what it’s churning on?
>
> Thanks,
> Evan
>
> [1]
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>


DirectRunner, Fusion, and Triggers

2021-05-08 Thread Bashir Sadjad
Hi Beam-users,

*TL;DR;* I wonder if DirectRunner does any fusion optimization

and whether this has any impact on triggers/panes?

*Details* (the context for everything below is *DirectRunner* and this is a
*batch* job):
I have a batch pipeline that roughly looks like this: S1->S2->S3

S1: Create URLs (from DB)
S2: Fetch those URLs (output of S1) and create Avro records
S3: Write those records to Parquet files

S2 and S3 can be fused to generate Parquet files while the records are
fetched/created. However, it does not seem to be the case, because there is
no [temp] file while the resources are being fetched and the writer log
messages appear only after all fetches are done.

If I add a trigger to the output PCollection of S2 (i.e., `records` below),
then I get intermediate Parquet output:
```
records.apply(Window. into(new GlobalWindows())
   .triggering(Repeatedly.forever(
   AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5
   .discardingFiredPanes());
```

However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
prints some log messages for each record and passes the record to output,
then it seems S2 and S2' are fused. Because the log messages are
interleaved with fetches.

*Q1*: Does DirectRunner do any fusion optimization (e.g., like
DataflowRunner)? If not by default, is there any way to enable it?

The other issue is with triggers and creating panes. I have an extended
version of this pipeline where a simplified view of it is:
S1->S2A->GBK->S2B->S3

S1: Like before
S2A: Add a key to the output of S1
GBK: Groups output of S2A to remove duplicate keys
S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
S3: Same as before

*Q2*: In this case, if I add a dummy S2B' after S2', the log messages are
*not* interleaved with resource fetches, i.e., no fusion is happening. Why?
What is different here?

*Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
file generation does not start until all of the fetches are done. Again,
what is different here and why intermediate panes are not fired while the
output of S2B is being generated?

Thanks

-B
P.S. I need this pipeline to work both on a distributed runner and also on
a local machine with many cores. That's why the performance of DirectRunner
is important to me.