Hi All, A Hive Join query which runs fine and faster in MapReduce takes lot of time with Spark and finally fails with OOM.
*Query: hivejoin.py* from pyspark import SparkContext, SparkConf from pyspark.sql import HiveContext conf = SparkConf().setAppName("Hive_Join") sc = SparkContext(conf=conf) hiveCtx = HiveContext(sc) hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id = a.instance_id"); results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect() print results *Data Study:* Number of Rows: A table has 1002093508 B table has 5371668 C table has 1000 No Data Skewness: item_id in B is unique and A has multiple rows with same item_id, so after first INNER_JOIN the result set is same 1002093508 rows instance_id in C is unique and A has multiple rows with same instance_id (maximum count of number of rows with same instance_id is 250) Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has allotted all the requested resource immediately and no other job is running on the cluster. spark.storage.memoryFraction 0.6 spark.shuffle.memoryFraction 0.2 Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write 500GB of intermediate data Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and output of 400GB is shuffled Stage 4 - tasks fails with OOM on reading the shuffled output data when it reached 40GB data itself First of all, what kind of Hive queries when run on Spark gets a better performance than Mapreduce. And what are the hive queries that won't perform well in Spark. How to calculate the optimal Heap for Executor Memory and the number of executors for given input data size. We don't specify Spark Executors to cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4 is failing immediately when it has just read 40GB data, is it caching data in Memory. And in a Spark job, some stage will need lot of memory for shuffle and some need lot of memory for cache. So, when a Spark Executor has lot of memory available for cache and does not use the cache but when there is a need to do lot of shuffle, will executors only use the shuffle fraction which is set for doing shuffle or will it use the free memory available for cache as well. Thanks, Prabhu Joseph