I'm a bit new to Spark, but had a question on performance. I suspect a lot of my issue is due to tuning and parameters. I have a Hive external table on this data and to run queries against it runs in minutes
The Job: + 40gb of avro events on HDFS (100 million+ avro events) + Read in the files from HDFS and dedupe events by key (mapToPair then a reduceByKey) + RDD returned and persisted (disk and memory) + Then passed to a job that take the RDD and mapToPair of new object data and then reduceByKey and foreachpartion do work The issue: When I run this on my environment on Yarn this takes 20+ hours. Running on yarn we see the first stage runs to do build the RDD deduped, but then when the next stage starts, things fail and data is lost. This results in stage 0 starting over and over and just dragging it out. Errors I see in the driver logs: ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on XXXXX: remote Akka client disassociated 15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 1335,XXXX): FetchFailed(BlockManagerId(3, iXXXX, 33958), shuffleId=1, mapId=162, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect toXXXXX/XXXXX:33958 Also we see this, but I'm suspecting this is because the previous stage fails and the next one starts: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 Cluster: 5 machines, each 2 core , 8gb machines Spark-submit command: spark-submit --class com.myco.SparkJob \ --master yarn \ /tmp/sparkjob.jar \ Any thoughts or where to look or how to start approaching this problem or more data points to present. Thanks.. Code for the job: JavaRDD<AnalyticsEvent> events = ((JavaRDD<AvroKey<AnalyticsEvent>>) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event -> AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key -> { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) .mapToPair(event -> new Tuple2<AnalyticsEvent, Integer>(event, 1)) .reduceByKey((analyticsEvent1, analyticsEvent2) -> analyticsEvent1) .map(tuple -> tuple._1()); events.persist(StorageLevel.MEMORY_AND_DISK_2()); events.mapToPair(event -> { return new Tuple2<T, RunningAggregates>( keySelector.select(event), new RunningAggregates( Optional.ofNullable(event.getVisitors()).orElse(0L), Optional.ofNullable(event.getImpressions()).orElse(0L), Optional.ofNullable(event.getAmount()).orElse(0.0D), Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D))); }) .reduceByKey((left, right) -> { return left.add(right); }) .foreachpartition(dostuff) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org