Finally I gave up after there are too many failed retry. >From the log in the worker side, it looks like failed with JVM OOM, as below: 15/02/05 17:02:03 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Driver Heartbeater,5,main]java.lang.OutOfMemoryError: Java heap space at java.lang.StringBuilder.toString(StringBuilder.java:812) at scala.collection.mutable.StringBuilder.toString(StringBuilder.scala:427) at scala.concurrent.duration.FiniteDuration.unitString(Duration.scala:583) at scala.concurrent.duration.FiniteDuration.toString(Duration.scala:584) at java.lang.String.valueOf(String.java:1675) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/05 17:02:03 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[org.apache.hadoop.hdfs.PeerCache@43fe286e,5,main]java.lang.OutOfMemoryError: Java heap space at org.spark-project.guava.common.collect.LinkedListMultimap$5.listIterator(LinkedListMultimap.java:912) at java.util.AbstractList.listIterator(AbstractList.java:310) at java.util.AbstractSequentialList.iterator(AbstractSequentialList.java:250) at org.apache.hadoop.hdfs.PeerCache.evictExpired(PeerCache.java:213) at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:255) at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39) at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135) at java.lang.Thread.run(Thread.java:853)15/02/05 17:02:03 ERROR executor.Executor: Exception in task 5.0 in stage 3.2 (TID 2618) Is this due to OOM in the shuffle stage? I already set the "SPARK_WORKER_MEMORY=8g", and I can see from the web UI it is 8g. Any configuration that I can change to avoid the above OOM? Thanks Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: My first experience with Spark Date: Thu, 5 Feb 2015 16:03:33 -0500
I am evaluating Spark for our production usage. Our production cluster is Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment running with Hadoop. What I have in mind is to test a very complex Hive query, which joins between 6 tables, lots of nested structure with exploding, and currently takes 8 hours daily running in our production. All the data of this query are in AVRO + Snappy. I setup one Box (24 core + 64G memory), installed the same version of Hadoop as our production, and put 5% of data on it (which is about 60G, snappy compressed AVRO files) I am running the same query in Hive. It took 6 rounds of MR jobs, finished around 30 hours on this one box. Now, I start to have fun with Spark. I checked out Spark 1.2.0, built it following Spark build instructions, and installed on this one box. Since the test data is all in AVRO format, so I also built the latest development version of SparkAvro, from https://github.com/databricks/spark-avro 1) First, I got some problems to use the AVRO data in spark-avro. It turns our that Spark 1.2.0 build processing will merge the mismatched version of AVRO core and AVRO mapred jars. I manually fixed it. See issue here: https://github.com/databricks/spark-avro/issues/242) After that, I am impressed becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 200 lines) just starts to run in Spark 1.2 using org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is reasonable. I finally changes the memory setting to: export SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=8g As 4g just doesn't work for the test data volume. After I set to 8G, the job won't fail due to OOM. 4) It looks like Spark generates 8 stages for the big query. It finishes the stage 1 and stage 2, then failed on stage 3 twice with the following error: FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 7 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) ) During whole test, the CPUs load average is about 16, and still had enough physical memory to use. I don't know what could be reason for above error. Spark did retry, but it had to retry from stage 1. Is this because in Spark, it doesn't persist the intermedia data in HDFS? I guess you have to pay the performance price somewhere no matter what. What I am looking for is that: 1) Can spark finish this query. Hive is slower, but it is reliable to finish a complex query you give it.2) How fast it can finish this query.3) I am going to check the final output vs the result from Hive. Spark stage 3 failed twice for me so far. We will see. Thanks Yong