Hi devinduan

The known issues Robert links there are actually HDFS related and not
specific to Spark.  The improvement we're seeking is that the final copy of
the output file can be optimised by using a "move" instead of "copy" andI
expect to have it fixed for Beam 2.8.0.  On a small dataset like this
though, I don't think it will impact performance too much.

Can you please elaborate on your deployment?  It looks like you are using a
cluster (i.e. deploy-mode client) but are you using HDFS?

I have access to a Cloudera CDH 5.12 Hadoop cluster and just ran an example
word count as follows - I'll explain the parameters to tune below:

1) I generate some random data (using common Hadoop tools)
hadoop jar
/opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar \
  teragen \
  -Dmapred.map.tasks=100 \
  -Dmapred.map.tasks.speculative.execution=false \
  10000000  \
  /tmp/tera

This puts 100 files totalling just under 1GB on which I will run the word
count. They are stored in the HDFS filesystem.

2) Run the word count using Spark (2.3.x) and Beam 2.5.0

In my cluster I have YARN to allocate resources, and an HDFS filesystem.
This will be different if you run Spark as standalone, or on a cloud
environment.

spark2-submit \
  --conf spark.default.parallelism=45 \
  --class org.apache.beam.runners.spark.examples.WordCount \
  --master yarn \
  --executor-memory 2G \
  --executor-cores 5 \
  --num-executors 9 \
  --jars
beam-sdks-java-core-2.5.0.jar,beam-runners-core-construction-java-2.5.0.jar,beam-runners-core-java-2.5.0.jar,beam-sdks-java-io-hadoop-file-system-2.5.0.jar
\
  beam-runners-spark-2.5.0.jar \
  --runner=SparkRunner \
  --inputFile=hdfs:///tmp/tera/* \
  --output=hdfs:///tmp/wordcount

The jars I provide here are the minimum needed for running on HDFS with
Spark and normally you'd build those into your project as an über jar.

The important bits for tuning for performance are the following - these
will be applicable for any Spark deployment (unless embedded):

  spark.default.parallelism - controls the parallelism of the beam
pipeline. In this case, how many workers are tokenizing the input data.
  executor-memory, executor-cores, num-executors - controls the resources
spark will use

Note, that the parallelism of 45 means that the 5 cores in the 9 executors
can all run concurrently (i.e. 5x9 = 45). When you get to very large
datasets, you will likely have parallelism much higher.

In this test I see around 20 seconds initial startup of Spark (copying
jars, requesting resources from YARN, establishing the Spark context) but
once up the job completes in a few seconds writing the output into 45 files
(because of the parallelism). The files are named
/tmp/wordcount-000*-of-00045.

I hope this helps provide a few pointers, but if you elaborate on your
environment we might be able to assist more.

Best wishes,
Tim













On Tue, Sep 18, 2018 at 9:29 AM Robert Bradshaw <rober...@google.com> wrote:

> There are known performance issues with Beam on Spark that are being
> worked on, e.g. https://issues.apache.org/jira/browse/BEAM-5036 . It's
> possible you're hitting something different, but would be worth
> investigating. See also
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Performance%20of%20write
>
> On Tue, Sep 18, 2018 at 8:39 AM devinduan(段丁瑞) <devind...@tencent.com>
> wrote:
>
>> Hi,
>>     I'm testing Beam on Spark.
>>     I use spark example code WordCount processing 1G data file, cost 1
>> minutes.
>>     However, I use Beam example code WordCount processing the same file,
>> cost 30minutes.
>>     My Spark parameter is :  --deploy-mode client  --executor-memory 1g
>> --num-executors 1 --driver-memory 1g
>>     My Spark version is 2.3.1,  Beam version is 2.5
>>     Is there any optimization method?
>> Thank you.
>>
>>
>>
>

Reply via email to