Thanks for sharing those results. The second set (executors at 20-30) look similar to what I would have expected. BEAM-5036 definitely plays a part here as the data is not moved on HDFS efficiently (fix in PR awaiting review now [1]).
To give an idea of the impact, here are some numbers from my own tests. Without knowing your code, I presume mine is similar to your filter (take data, modify it, write data with no shuffle/group/join) My environment: 10 node YARN CDH 5.12.2 cluster, rewriting a 1.5TB AvroIO file (code here [2]) I observed: - Using Spark API: 35 minutes - Beam AvroIO (2.6.0): 1.7hrs - Beam AvroIO with the 5036 fix: 42 minutes Related: I also anticipate that varying the spark.default.parallelism will affect Beam runtime. Thanks, Tim [1] https://github.com/apache/beam/pull/6289 [2] https://github.com/gbif/beam-perf/tree/master/avro-to-avro On Fri, Sep 28, 2018 at 9:27 AM Robert Bradshaw <rober...@google.com> wrote: > Something here on the Beam side is clearly linear in the input size, as if > there's a bottleneck where were' not able to get any parallelization. Is > the spark variant running in parallel? > > On Fri, Sep 28, 2018 at 4:57 AM devinduan(段丁瑞) <devind...@tencent.com> > wrote: > >> Hi >> I have completed my test. >> 1. Spark parameter : >> deploy-mode client >> executor-memory 1g >> num-executors 1 >> driver-memory 1g >> >> WordCount: >> >> >> >> 300MB >> >> 600MB >> >> 1.2G >> >> Spark >> >> 1min8s >> >> 1min11s >> >> 1min18s >> >> Beam >> >> 6.4min >> >> 11min >> >> 22min >> >> >> >> Filter: >> >> >> >> 300MB >> >> 600MB >> >> 1.2G >> >> Spark >> >> 1.2min >> >> 1.7min >> >> 2.8min >> >> Beam >> >> 2.7min >> >> 4.1min >> >> 5.7min >> >> >> >> GroupbyKey + sum >> >> >> >> 300MB >> >> 600MB >> >> 1.2G >> >> Spark >> >> 3.6min >> >> >> >> >> >> Beam >> >> Failed, executor oom >> >> >> >> >> >> >> >> Union >> >> >> >> 300MB >> >> 600MB >> >> 1.2G >> >> Spark >> >> 1.7min >> >> 2.6min >> >> 5.1min >> >> Beam >> >> 3.6min >> >> 6.2min >> >> 11min >> >> >> >> 2. Spark parameter : >> >> deploy-mode client >> >> executor-memory 1g >> >> driver-memory 1g >> >> spark.dynamicAllocation.enabled true >> >