Hi Experts,

I have a large table with 54 million records (fact table), being joined
with 6 small tables (dimension tables). The size on disk of small tables is
within 5k and the record count is in the range of 4 - 200
All the worker nodes have RAM of 32GB allocated for spark. I have tried the
below approaches and looks like the small tables are not being broadcast,
which is causing timeouts as expected and failure of the job.
The reason for this, AFAIK is, the small table is also getting shuffled and
is fitting into a single node's partition. Then the large table is made to
flow to the same node which stays busy while all other nodes are idle.

Note: The spark version in use on cluster as well as my local setup is
1.1.0. I also tried with Spark 1.2.0 in the local setup, however the
queryPlan showed no change.

1. Broadcast the RDD before registering as table:
         val k = sqlContext.parquetFile(p.fileName)
         val t = sc.broadcast(k)
         t.value.registerTempTable(p.tableName)

2. Set the variable
     sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold","100000000")


3. Added limit to each small table before registering as table. I guess
this gives optimizer a way compute statistics and determine that the other
table is small enough for broadcast:
   sqlContext.sql("select * from a_nolim limit 7").registerTempTable("edu")

      also tried DSL style:

 a.limit(7).registerTempTable("edu")

   Tried explicit broadcasting of the tables as below:

   sc.broadcast(sqlContext.sql("select * from edu_nolim limit
7")).value.registerTempTable("edu")

   and tried dsl style with broadcast done on the rdd as well

4. Used DSL style of join:
   val try2 = a1.join(cdemo,LeftOuter,Some("dem.key1".attr ===
"ed.key1".attr ))

5. Ran the below commad in hive for all small tables:
   ANALYZE TABLE  tableName COMPUTE STATISTICS noscan

   Please note, the application uses SQLContext and not hive context. Hence
I ran the compute statistics out of the application from hue -> hive
editor. I am assuming the statistics are available in the metastore,
however, not sure
if spark can fetch these statistics since I am not using hive context
within the application.

6. Not sure if these are valid flags, but tried with them set anyways:
  sqlContext.setConf("spark.sql.planner.dimensionJoin","true")
  sqlContext.setConf("spark.sql.planner.generatedDimensionJoin","true")
  sqlContext.setConf("multiWayJoin","true")
  sqlContext.setConf("turbo", "true")

7. Tried CacheTable for all small tables. This changes the query execution
to InMemoryRelation instead of ParquetTableScan, however, shuffle -
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200) remains.

8. Reduced the shuffle partition number with this parameter -
sqlContext.setConf("spark.sql.shuffle.partitions","8"). But this did not
help.

With all these attempts, the small tables are still getting shuffled I
guess. Below are the queryExecutions printed on every attempt and they have
remained almost same on every attempt:

DSL Style execution plan    (i.e.
rdd1.join(rdd2,LeftOuter,Some("rdd1.key".attr === "rdd2.key".attr))
-----------------------------------------------------------------------------------------------------
DSL Style execution plan --> HashOuterJoin [education#18],
[i1_education_cust_demo#29], LeftOuter, None
 Exchange (HashPartitioning [education#18], 200)
  ParquetTableScan [education#18,education_desc#19], (ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200)
  ParquetTableScan
[customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29],
(ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []


SQL Style execution plan (i.e sqlContext.sql("select a,b,c,d,e from t1 left
outer join t2 on t1.a = t2.a")
----------------------------------------------------------------------------------------------------------
Project
[customer_id_cust_demo#20,i1_education_cust_demo#29,marital_status_cust_demo#24,hh_income_cust_demo#23,length_of_residence_cust_demo#26,ethnicity_cust_demo#25,gndr_cd_cust_demo#22,age_dt_cust_demo#21,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,education_desc#19]
 HashOuterJoin [i1_education_cust_demo#29], [education#18], LeftOuter, None
  Exchange (HashPartitioning [i1_education_cust_demo#29], 200)
   ParquetTableScan
[customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29],
(ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []
  Exchange (HashPartitioning [education#18], 200)
   Limit 7
    ParquetTableScan [education#18,education_desc#19], (ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []

The Stage at which the application starts to stagger is:

mapPartitions at Exchange.scala:48
 On every attempt, the driver logs show some slowness:
[2015-02-16 12:32:21,840] INFO  k.scheduler.TaskSetManager []
[akka://JobServer/user/context-supervisor/large] - Finished task 54.0 in
stage 15.0 (TID 781) in 18431 ms on votlbdcd04.tms.toyota.com (198/200)
[2015-02-16 12:40:19,993] INFO  k.scheduler.TaskSetManager [] [] - Finished
task 39.0 in stage 15.0 (TID 766) in 496698 ms on votlbdcd08.tms.toyota.com
(199/200)


And the worker node which shows active tasks has below log entries:

2015-02-16 11:09:25,189 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.63.37.109%3A35610-14#-2057161980]
was not delivered. [4] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
2015-02-16 11:09:25,194 ERROR akka.remote.EndpointWriter: AssociationError
[akka.tcp://sparkwor...@votlbdcd07.tms.toyota.com:7078] -> [akka.tcp://
sparkexecu...@votlbdcd07.tms.toyota.com:12700]: Error [Association failed
with [akka.tcp://sparkexecu...@votlbdcd07.tms.toyota.com:12700]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkexecu...@votlbdcd07.tms.toyota.com:12700]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: votlbdcd07.tms.toyota.com/10.63.37.109:12700
]


Appreciate any help. Just in case this trivia helps - the same query
executes within 2 mins with Impala with much less resources.

regards
Sunita

Reply via email to