I'm a bit new to Spark, but had a question on performance. I suspect a lot of
my issue is due to tuning and parameters. I have a Hive external table on
this data and to run queries against it runs in minutes 

The Job:
+ 40gb of avro events on HDFS (100 million+ avro events)
+ Read in the files from HDFS and dedupe events by key (mapToPair then a
reduceByKey)
+ RDD returned and persisted (disk and memory)
+ Then passed to a job that take the RDD and mapToPair of new object data
and then reduceByKey and foreachpartion do work

The issue:
When I run this on my environment on Yarn this takes 20+ hours. Running on
yarn we see the first stage runs to do build the RDD deduped, but then when
the next stage starts, things fail and data is lost. This results in stage 0
starting over and over and just dragging it out.

Errors I see in the driver logs:
ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on XXXXX: remote
Akka client disassociated

15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1
(TID 1335,XXXX): FetchFailed(BlockManagerId(3, iXXXX, 33958), shuffleId=1,
mapId=162, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect
toXXXXX/XXXXX:33958

Also we see this, but I'm suspecting this is because the previous stage
fails and the next one starts:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1

Cluster:
5 machines, each 2 core , 8gb machines 

Spark-submit command:
 spark-submit --class com.myco.SparkJob \
    --master yarn \
    /tmp/sparkjob.jar \

Any thoughts or where to look or how to start approaching this problem or
more data points to present.

Thanks..

Code for the job:
 JavaRDD<AnalyticsEvent> events = ((JavaRDD<AvroKey&lt;AnalyticsEvent>>)
context.newAPIHadoopRDD(
            context.hadoopConfiguration(),
            AvroKeyInputFormat.class,
            AvroKey.class,
            NullWritable.class
        ).keys())
        .map(event -> AnalyticsEvent.newBuilder(event.datum()).build())
        .filter(key -> { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
        .mapToPair(event -> new Tuple2<AnalyticsEvent, Integer>(event, 1)) 
        .reduceByKey((analyticsEvent1, analyticsEvent2) -> analyticsEvent1)
        .map(tuple -> tuple._1());

        events.persist(StorageLevel.MEMORY_AND_DISK_2());
        events.mapToPair(event -> {
            return new Tuple2<T, RunningAggregates>(
                keySelector.select(event),
                new RunningAggregates(
                    Optional.ofNullable(event.getVisitors()).orElse(0L),
                    Optional.ofNullable(event.getImpressions()).orElse(0L),
                    Optional.ofNullable(event.getAmount()).orElse(0.0D),
                   
Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
        })
        .reduceByKey((left, right) -> { return left.add(right); })
        .foreachpartition(dostuff)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to