Have you tried joins on regular RDD instead of schemaRDD? We have found
that its 10 times faster than joins between schemaRDDs.

val largeRDD = ...
val smallRDD = ...

largeRDD.join(smallRDD)  // otherway JOIN would run for long.

Only limitation i see with that implementation is regular RDD supports only
22 fields unless you use nested the case class.



On Thu, Dec 4, 2014 at 12:57 PM, Venkat Subramanian <vsubr...@gmail.com>
wrote:

> Hi Cheng,
>
> Thank you very much for taking your time and providing a detailed
> explanation.
> I tried a few things you suggested and some more things.
>
> The ContactDetail table (8 GB) is the fact table and DAgents is the Dim
> table (<500 KB), reverse of what you are assuming, but your ideas still
> apply.
>
> I tried the following:
>
> a) Cached the smaller Dim table to memory.
>  sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "10000000")
>  sqlContext.cacheTable("DAgents")
>
> UI -> Stage -> Storage shows it to be cached in RDD when I run it.
>
> val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
> ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")
>
> CDJoinQry.map(ta => ta(4)).count
>
> I see no difference in terms of performance. It takes the same amount of
> time for the query ~1.2 min
>
> b)  I reversed both the order of tables and where clause in the query
>
> val CDJoinQry= sqlContext.sql("SELECT  * FROM DAgents, ContactDetail  WHERE
> DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6")
>
> The performance went  bad. It took 6-7 min to complete.
>
> Just changing the order of table in Select for this join, keeping the same
> where clause order, perf was similar (1.2-1.4 min).
>
> c)  Using query in a), I tried to keep the storage in columnar fashion with
> sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
>
> I see no difference in terms of performance. It takes the same amount of
> time for the query ~1.2 min.
> Not sure if it even works.
>
> d) I tried changing the comma separated HDFS files to Parquet format in
> HDFS
> and reading it as parquet and then running query on it.
>
> DAgents.saveAsParquetFile("DAgents.parquet")
> FCDRDD.saveAsParquetFile("ContactDetail.parquet")
>
>
> val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet")
> DAgentsParquetRDD.registerAsTable("DAgentsParquet")
>
> val FContactDetailParquetRDD =
> sqlContext.parquetFile("ContactDetail.parquet")
> FContactDetailParquetRDD.registerAsTable("ContactDetailParquet")
>
> val CDJoinQryParquet= sqlContext.sql("SELECT  * FROM ContactDetailParquet,
> DAgentsParquet  WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and
> DAgentsParquet.f1 = 902")
> CDJoinQryParquet.map(ta => ta(4)).count
>
> *The query time is actually more for this join query.* It ended up taking
> 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse
> than non parquet for this join.
>
> I changed the query where table order and where clause was reversed and ran
> it for parquet
>
> val CDJoinQryParquetReversed= sqlContext.sql("SELECT  * FROM
> DAgentsParquet,
> ContactDetailParquet  WHERE   DAgentsParquet.f1 = 902 and
> DAgentsParquet.f1=ContactDetailParquet.f6 ")
> CDJoinQryParquetReversed.map(ta => ta(4)).count
>
> it took > 18 min and had to kill it as it kept on running.
>
> *But queries where there is no join, Parquet's performance was extremely
> good.*
> For example, this query below where there is no join, ran in 8 seconds,
> whereas the same query in non parquet  took 30 seconds.
> val CDJoinQryParquet0= sqlContext.sql("SELECT  * FROM ContactDetailParquet
> WHERE ContactDetailParquet.f6 = 902")
> CDJoinQryParquet0.map(ta => ta(4)).count
>
> *Some potential conclusions (pl. comment) :*
> * Order in where clause seems to matter in Spark SQL optimizer. In
> relational DBs  that I have worked with, when I noticed, order of where
> clause is typically a hint . Would be nice of Spark SQL optimizer is fixed
> to ignore order of clauses and optimize it automatically.
> * I tried changing just the table order  in Select statement for a join and
> it also seems to matter when reading data from HDFS (for parquet and to a
> less extent for non parquet in my case) even when the where clause order is
> same. Would be nice of SQL optimizer  optimizes it automatically.
> * Table joins for huge table(s) are costly. Fact and Dimension concepts
> from
> star schema don't translate well to Big Data (Hadoop, Spark). It may be
> better to de-normalize and store huge tables to avoid Joins. Joins seem to
> be evil. (Have tried de-normalizing when using Cassandra, but that has its
> own problem of resulting in full table scan when running ad-hoc queries
> when
> the keys are not known)
>
> Regards,
>
> Venkat
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to