Hi, Deb:From what I search online, changing parallelism is one option. But the 
failed stage already had 200 tasks, which is quite large on a one 24 core box.I 
know query that amount of data in one box is kind of over, but I do want to 
know how to config it using less memory, even it could mean using more time.We 
plan to make spark coexist with Hadoop cluster, so be able to control its 
memory usage is important for us.Does spark need that much of memory?ThanksYong
Date: Thu, 5 Feb 2015 15:36:48 -0800
Subject: Re: My first experience with Spark
From: deborah.sie...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

Hi Yong, 
Have you tried increasing your level of parallelism? How many tasks are you 
getting in failing stage? 2-3 tasks per CPU core is recommended, though maybe 
you need more for your shuffle operation?
  
You can configure spark.default.parallelism, or pass in a level of parallelism 
as second parameter to a suitable operation in your code. 

Deb
On Thu, Feb 5, 2015 at 1:03 PM, java8964 <java8...@hotmail.com> wrote:



I am evaluating Spark for our production usage. Our production cluster is 
Hadoop 2.2.0 without Yarn. So I want to test Spark with Standalone deployment 
running with Hadoop.
What I have in mind is to test a very complex Hive query, which joins between 6 
tables, lots of nested structure with exploding, and currently takes 8 hours 
daily running in our production.
All the data of this query are in AVRO + Snappy.
I setup one Box (24 core + 64G memory), installed the same version of Hadoop as 
our production, and put 5% of data on it (which is about 60G, snappy compressed 
AVRO files)
I am running the same query in Hive. It took 6 rounds of MR jobs, finished 
around 30 hours on this one box.
Now, I start to have fun with Spark.
I checked out Spark 1.2.0, built it following Spark build instructions, and 
installed on this one box.
Since the test data is all in AVRO format, so I also built the latest 
development version of SparkAvro, from https://github.com/databricks/spark-avro
1) First, I got some problems to use the AVRO data in spark-avro. It turns our 
that Spark 1.2.0 build processing will merge the mismatched version of AVRO 
core and AVRO mapred jars. I manually fixed it. See issue here: 
https://github.com/databricks/spark-avro/issues/242) After that, I am impressed 
becauseThe AVRO file just works from HDFS to Spark 1.2The complex query (about 
200 lines) just starts to run in Spark 1.2 using 
org.apache.spark.sql.hive.HiveContext without any problem. This HiveContext 
just works in Spark SQL 1.2. Very nice.3) I got several OOM, which is 
reasonable. I finally changes the memory setting to: export 
SPARK_WORKER_MEMORY=8gexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=8g       As 4g just doesn't work for the test data 
volume. After I set to 8G, the job won't fail due to OOM.
4) It looks like Spark generates 8 stages for the big query. It finishes the 
stage 1 and stage 2, then failed on stage 3 twice with the following error:







FetchFailed(null, shuffleId=7, mapId=-1, reduceId=7, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 7
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
        at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:853)














































)


During whole test, the CPUs load average is about 16, and still had enough 
physical memory to use. I don't know what could be reason for above error. 


Spark did retry, but it had to retry from stage 1. Is this because in Spark, it 
doesn't persist the intermedia data in HDFS? 
I guess you have to pay the performance price somewhere no matter what.


What I am looking for is that:


1) Can spark finish this query. Hive is slower, but it is reliable to finish a 
complex query you give it.
2) How fast it can finish this query.
3) I am going to check the final output vs the result from Hive. 


Spark stage 3 failed twice for me so far. We will see.


Thanks


Yong
                                          

                                          

Reply via email to