Have you tried joins on regular RDD instead of schemaRDD? We have found that its 10 times faster than joins between schemaRDDs.
val largeRDD = ... val smallRDD = ... largeRDD.join(smallRDD) // otherway JOIN would run for long. Only limitation i see with that implementation is regular RDD supports only 22 fields unless you use nested the case class. On Thu, Dec 4, 2014 at 12:57 PM, Venkat Subramanian <vsubr...@gmail.com> wrote: > Hi Cheng, > > Thank you very much for taking your time and providing a detailed > explanation. > I tried a few things you suggested and some more things. > > The ContactDetail table (8 GB) is the fact table and DAgents is the Dim > table (<500 KB), reverse of what you are assuming, but your ideas still > apply. > > I tried the following: > > a) Cached the smaller Dim table to memory. > sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "10000000") > sqlContext.cacheTable("DAgents") > > UI -> Stage -> Storage shows it to be cached in RDD when I run it. > > val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE > ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") > > CDJoinQry.map(ta => ta(4)).count > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min > > b) I reversed both the order of tables and where clause in the query > > val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE > DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") > > The performance went bad. It took 6-7 min to complete. > > Just changing the order of table in Select for this join, keeping the same > where clause order, perf was similar (1.2-1.4 min). > > c) Using query in a), I tried to keep the storage in columnar fashion with > sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") > > I see no difference in terms of performance. It takes the same amount of > time for the query ~1.2 min. > Not sure if it even works. > > d) I tried changing the comma separated HDFS files to Parquet format in > HDFS > and reading it as parquet and then running query on it. > > DAgents.saveAsParquetFile("DAgents.parquet") > FCDRDD.saveAsParquetFile("ContactDetail.parquet") > > > val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") > DAgentsParquetRDD.registerAsTable("DAgentsParquet") > > val FContactDetailParquetRDD = > sqlContext.parquetFile("ContactDetail.parquet") > FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") > > val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, > DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and > DAgentsParquet.f1 = 902") > CDJoinQryParquet.map(ta => ta(4)).count > > *The query time is actually more for this join query.* It ended up taking > 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse > than non parquet for this join. > > I changed the query where table order and where clause was reversed and ran > it for parquet > > val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM > DAgentsParquet, > ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and > DAgentsParquet.f1=ContactDetailParquet.f6 ") > CDJoinQryParquetReversed.map(ta => ta(4)).count > > it took > 18 min and had to kill it as it kept on running. > > *But queries where there is no join, Parquet's performance was extremely > good.* > For example, this query below where there is no join, ran in 8 seconds, > whereas the same query in non parquet took 30 seconds. > val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet > WHERE ContactDetailParquet.f6 = 902") > CDJoinQryParquet0.map(ta => ta(4)).count > > *Some potential conclusions (pl. comment) :* > * Order in where clause seems to matter in Spark SQL optimizer. In > relational DBs that I have worked with, when I noticed, order of where > clause is typically a hint . Would be nice of Spark SQL optimizer is fixed > to ignore order of clauses and optimize it automatically. > * I tried changing just the table order in Select statement for a join and > it also seems to matter when reading data from HDFS (for parquet and to a > less extent for non parquet in my case) even when the where clause order is > same. Would be nice of SQL optimizer optimizes it automatically. > * Table joins for huge table(s) are costly. Fact and Dimension concepts > from > star schema don't translate well to Big Data (Hadoop, Spark). It may be > better to de-normalize and store huge tables to avoid Joins. Joins seem to > be evil. (Have tried de-normalizing when using Cassandra, but that has its > own problem of resulting in full table scan when running ad-hoc queries > when > the keys are not known) > > Regards, > > Venkat > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >