I am evaluating Spark for our production usage. Our production cluster is 
Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment 
running with Hadoop.
What I have in mind is to test a very complex Hive query, which joins between 6 
tables, lots of nested structure with exploding, and currently takes 8 hours 
daily running in our production.
All the data of this query are in AVRO + Snappy.
I setup one Box (24 core + 64G memory), installed the same version of Hadoop as 
our production, and put 5% of data on it (which is about 60G, snappy compressed 
AVRO files)
I am running the same query in Hive. It took 6 rounds of MR jobs, finished 
around 30 hours on this one box.
Now, I start to have fun with Spark.
I checked out Spark 1.2.0, built it following Spark build instructions, and 
installed on this one box.
Since the test data is all in AVRO format, so I also built the latest 
development version of SparkAvro, from https://github.com/databricks/spark-avro
1) First, I got some problems to use the AVRO data in spark-avro. It turns our 
that Spark 1.2.0 build processing will merge the mismatched version of AVRO 
core and AVRO mapred jars. I manually fixed it. See issue here: 
https://github.com/databricks/spark-avro/issues/242) After that, I am impressed 
becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 
200 lines) just starts to run in Spark 1.2 using 
org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext 
just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is 
reasonable. I finally changes the memory setting to: export 
SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=8g       As 4g just doesn't work for the test data 
volume. After I set to 8G, the job won't fail due to OOM.
4) It looks like Spark generates 8 stages for the big query. It finishes the 
stage 1 and stage 2, then failed on stage 3 twice with the following error:







FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, 
message=org.apache.spark.shuffle.MetadataFetchFailedException: Missing an 
output location for shuffle 7    at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
 at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)        at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)      at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
 at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) 
     at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
     at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)       at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)     at 
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)   at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)     at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)     at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)     at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)     at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)     at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)      at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)     at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)   at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)   at 
org.apache.spark.scheduler.Task.run(Task.scala:56)   at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) 
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) 
     at java.lang.Thread.run(Thread.java:853)












































)
During whole test, the CPUs load average is about 16, and still had enough 
physical memory to use. I don't know what could be reason for above error. 
Spark did retry, but it had to retry from stage 1. Is this because in Spark, it 
doesn't persist the intermedia data in HDFS? I guess you have to pay the 
performance price somewhere no matter what.
What I am looking for is that:
1) Can spark finish this query. Hive is slower, but it is reliable to finish a 
complex query you give it.2) How fast it can finish this query.3) I am going to 
check the final output vs the result from Hive. 
Spark stage 3 failed twice for me so far. We will see.
Thanks
Yong                                      

Reply via email to