Hi Cheng, Thank you very much for taking your time and providing a detailed explanation. I tried a few things you suggested and some more things.
The ContactDetail table (8 GB) is the fact table and DAgents is the Dim table (<500 KB), reverse of what you are assuming, but your ideas still apply. I tried the following: a) Cached the smaller Dim table to memory. sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "10000000") sqlContext.cacheTable("DAgents") UI -> Stage -> Storage shows it to be cached in RDD when I run it. val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") CDJoinQry.map(ta => ta(4)).count I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min b) I reversed both the order of tables and where clause in the query val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") The performance went bad. It took 6-7 min to complete. Just changing the order of table in Select for this join, keeping the same where clause order, perf was similar (1.2-1.4 min). c) Using query in a), I tried to keep the storage in columnar fashion with sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min. Not sure if it even works. d) I tried changing the comma separated HDFS files to Parquet format in HDFS and reading it as parquet and then running query on it. DAgents.saveAsParquetFile("DAgents.parquet") FCDRDD.saveAsParquetFile("ContactDetail.parquet") val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") DAgentsParquetRDD.registerAsTable("DAgentsParquet") val FContactDetailParquetRDD = sqlContext.parquetFile("ContactDetail.parquet") FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and DAgentsParquet.f1 = 902") CDJoinQryParquet.map(ta => ta(4)).count *The query time is actually more for this join query.* It ended up taking 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse than non parquet for this join. I changed the query where table order and where clause was reversed and ran it for parquet val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM DAgentsParquet, ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and DAgentsParquet.f1=ContactDetailParquet.f6 ") CDJoinQryParquetReversed.map(ta => ta(4)).count it took > 18 min and had to kill it as it kept on running. *But queries where there is no join, Parquet's performance was extremely good.* For example, this query below where there is no join, ran in 8 seconds, whereas the same query in non parquet took 30 seconds. val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet WHERE ContactDetailParquet.f6 = 902") CDJoinQryParquet0.map(ta => ta(4)).count *Some potential conclusions (pl. comment) :* * Order in where clause seems to matter in Spark SQL optimizer. In relational DBs that I have worked with, when I noticed, order of where clause is typically a hint . Would be nice of Spark SQL optimizer is fixed to ignore order of clauses and optimize it automatically. * I tried changing just the table order in Select statement for a join and it also seems to matter when reading data from HDFS (for parquet and to a less extent for non parquet in my case) even when the where clause order is same. Would be nice of SQL optimizer optimizes it automatically. * Table joins for huge table(s) are costly. Fact and Dimension concepts from star schema don't translate well to Big Data (Hadoop, Spark). It may be better to de-normalize and store huge tables to avoid Joins. Joins seem to be evil. (Have tried de-normalizing when using Cassandra, but that has its own problem of resulting in full table scan when running ad-hoc queries when the keys are not known) Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org