Hi, Deb:From what I search online, changing parallelism is one option. But the failed stage already had 200 tasks, which is quite large on a one 24 core box.I know query that amount of data in one box is kind of over, but I do want to know how to config it using less memory, even it could mean using more time.We plan to make spark coexist with Hadoop cluster, so be able to control its memory usage is important for us.Does spark need that much of memory?ThanksYong Date: Thu, 5 Feb 2015 15:36:48 -0800 Subject: Re: My first experience with Spark From: deborah.sie...@gmail.com To: java8...@hotmail.com CC: user@spark.apache.org
Hi Yong, Have you tried increasing your level of parallelism? How many tasks are you getting in failing stage? 2-3 tasks per CPU core is recommended, though maybe you need more for your shuffle operation? You can configure spark.default.parallelism, or pass in a level of parallelism as second parameter to a suitable operation in your code. Deb On Thu, Feb 5, 2015 at 1:03 PM, java8964 <java8...@hotmail.com> wrote: I am evaluating Spark for our production usage. Our production cluster is Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment running with Hadoop. What I have in mind is to test a very complex Hive query, which joins between 6 tables, lots of nested structure with exploding, and currently takes 8 hours daily running in our production. All the data of this query are in AVRO + Snappy. I setup one Box (24 core + 64G memory), installed the same version of Hadoop as our production, and put 5% of data on it (which is about 60G, snappy compressed AVRO files) I am running the same query in Hive. It took 6 rounds of MR jobs, finished around 30 hours on this one box. Now, I start to have fun with Spark. I checked out Spark 1.2.0, built it following Spark build instructions, and installed on this one box. Since the test data is all in AVRO format, so I also built the latest development version of SparkAvro, from https://github.com/databricks/spark-avro 1) First, I got some problems to use the AVRO data in spark-avro. It turns our that Spark 1.2.0 build processing will merge the mismatched version of AVRO core and AVRO mapred jars. I manually fixed it. See issue here: https://github.com/databricks/spark-avro/issues/242) After that, I am impressed becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 200 lines) just starts to run in Spark 1.2 using org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is reasonable. I finally changes the memory setting to: export SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=8g As 4g just doesn't work for the test data volume. After I set to 8G, the job won't fail due to OOM. 4) It looks like Spark generates 8 stages for the big query. It finishes the stage 1 and stage 2, then failed on stage 3 twice with the following error: FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 7 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) ) During whole test, the CPUs load average is about 16, and still had enough physical memory to use. I don't know what could be reason for above error. Spark did retry, but it had to retry from stage 1. Is this because in Spark, it doesn't persist the intermedia data in HDFS? I guess you have to pay the performance price somewhere no matter what. What I am looking for is that: 1) Can spark finish this query. Hive is slower, but it is reliable to finish a complex query you give it. 2) How fast it can finish this query. 3) I am going to check the final output vs the result from Hive. Spark stage 3 failed twice for me so far. We will see. Thanks Yong