Hi devinduan The known issues Robert links there are actually HDFS related and not specific to Spark. The improvement we're seeking is that the final copy of the output file can be optimised by using a "move" instead of "copy" andI expect to have it fixed for Beam 2.8.0. On a small dataset like this though, I don't think it will impact performance too much.
Can you please elaborate on your deployment? It looks like you are using a cluster (i.e. deploy-mode client) but are you using HDFS? I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example word count as follows - I'll explain the parameters to tune below: 1) I generate some random data (using common Hadoop tools) hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \ teragen \ -Dmapred.map.tasks=100 \ -Dmapred.map.tasks.speculative.execution=false \ 10000000 \ /tmp/tera This puts 100 files totalling just under 1GB on which I will run the word count. They are stored in the HDFS filesystem. 2) Run the word count using Spark (2.3.x) and Beam 2.5.0 In my cluster I have YARN to allocate resources, and an HDFS filesystem. This will be different if you run Spark as standalone, or on a cloud environment. spark2-submit \ --conf spark.default.parallelism=45 \ --class org.apache.beam.runners.spark.examples.WordCount \ --master yarn \ --executor-memory 2G \ --executor-cores 5 \ --num-executors 9 \ --jars beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar \ beam-runners-spark-2.5.0.jar \ --runner=SparkRunner \ --inputFile=hdfs:///tmp/tera/* \ --output=hdfs:///tmp/wordcount The jars I provide here are the minimum needed for running on HDFS with Spark and normally you'd build those into your project as an über jar. The important bits for tuning for performance are the following - these will be applicable for any Spark deployment (unless embedded): spark.default.parallelism - controls the parallelism of the beam pipeline. In this case, how many workers are tokenizing the input data. executor-memory, executor-cores, num-executors - controls the resources spark will use Note, that the parallelism of 45 means that the 5 cores in the 9 executors can all run concurrently (i.e. 5x9 = 45). When you get to very large datasets, you will likely have parallelism much higher. In this test I see around 20 seconds initial startup of Spark (copying jars, requesting resources from YARN, establishing the Spark context) but once up the job completes in a few seconds writing the output into 45 files (because of the parallelism). The files are named /tmp/wordcount-000*-of-00045. I hope this helps provide a few pointers, but if you elaborate on your environment we might be able to assist more. Best wishes, Tim On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <rober...@google.com> wrote: > There are known performance issues with Beam on Spark that are being > worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's > possible you're hitting something different, but would be worth > investigating. See also > https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write > > On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <devind...@tencent.com> > wrote: > >> Hi, >> I'm testing Beam on Spark. >> I use spark example code WordCount processing 1G data file, cost 1 >> minutes. >> However, I use Beam example code WordCount processing the same file, >> cost 30minutes. >> My Spark parameter is : --deploy-mode client --executor-memory 1g >> --num-executors 1 --driver-memory 1g >> My Spark version is 2.3.1, Beam version is 2.5 >> Is there any optimization method? >> Thank you. >> >> >> >