Hi All,

A Hive Join query which runs fine and faster in MapReduce takes lot of time
with Spark and finally fails with OOM.

*Query:  hivejoin.py*

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf().setAppName("Hive_Join")
sc = SparkContext(conf=conf)
hiveCtx = HiveContext(sc)
hiveCtx.hql("INSERT OVERWRITE TABLE D select <80 columns> from A a INNER
JOIN B b ON a.item_id = b.item_id LEFT JOIN C c ON c.instance_id =
a.instance_id");
results = hiveCtx.hql("SELECT COUNT(1) FROM D").collect()
print results


*Data Study:*

Number of Rows:

A table has 1002093508
B table has    5371668
C table has          1000

No Data Skewness:

item_id in B is unique and A has multiple rows with same item_id, so after
first INNER_JOIN the result set is same 1002093508 rows

instance_id in C is unique and A has multiple rows with same instance_id
(maximum count of number of rows with same instance_id is 250)

Spark Job runs with 90 Executors each with 2cores and 6GB memory. YARN has
allotted all the requested resource immediately and no other job is running
on the
cluster.

spark.storage.memoryFraction     0.6
spark.shuffle.memoryFraction     0.2

Stage 2 - reads data from Hadoop, Tasks has NODE_LOCAL and shuffle write
500GB of intermediate data

Stage 3 - does shuffle read of 500GB data, tasks has PROCESS_LOCAL and
output of 400GB is shuffled

Stage 4 - tasks fails with OOM on reading the shuffled output data when it
reached 40GB data itself

First of all, what kind of Hive queries when run on Spark gets a better
performance than Mapreduce. And what are the hive queries that won't perform
well in Spark.

How to calculate the optimal Heap for Executor Memory and the number of
executors for given input data size. We don't specify Spark Executors to
cache any data. But how come Stage 3 tasks says PROCESS_LOCAL. Why Stage 4
is failing immediately
when it has just read 40GB data, is it caching data in Memory.

And in a Spark job, some stage will need lot of memory for shuffle and some
need lot of memory for cache. So, when a Spark Executor has lot of memory
available
for cache and does not use the cache but when there is a need to do lot of
shuffle, will executors only use the shuffle fraction which is set for
doing shuffle or will it use
the free memory available for cache as well.


Thanks,
Prabhu Joseph

Reply via email to