Hari,

Thanks for the details and sorry for the late reply. Currently Spark SQL doesn’t enable broadcast join optimization for left outer join, thus shuffles are required to perform this query. I made a quite artificial test to show the physical plan of your query:

|== Physical Plan ==
HashOuterJoin [state#15], [state#19], LeftOuter, None
 Exchange (HashPartitioning [state#15], 200)
  PhysicalRDD [state#15,city#16,amount#17,amount2#18], MapPartitionsRDD[1] at 
mapPartitions at ExistingRDD.scala:36
 Aggregate false, [state#19], [state#19,MAX(PartialMax#24) AS amount1#4]
  Exchange (HashPartitioning [state#19], 200)
   Aggregate true, [state#19], [state#19,MAX(amount2#22) AS PartialMax#24]
    Project [state#19,amount2#22]
     PhysicalRDD [state#19,city#20,amount#21,amount2#22], MapPartitionsRDD[1] 
at mapPartitions at ExistingRDD.scala:36
|

For each |Exchange| operator, a shuffle is inserted. This partly causes low performance. On the other hand, the default shuffle partition number is 200, which is apparent too large for only 30K rows, and introduces unnecessary task scheduling costs. You may try to lower the shuffle number to, for example, 8.

Also, PR #3270 <https://github.com/apache/spark/pull/3270> is part of the attempt to accelerate similar queries.

Cheng

On 12/18/14 10:41 PM, Hari Rajaram wrote:

Cheng,
Thanks for looking at the issue.As I said earlier,it is a schemaRDD created from case class by reading a tab delimted file.
I'm using DSL to join to the RDD's.

Just a small snippet

RDD1:
case classRDD1 (state:String,city:String,amount:Double,amount2:Double)
RDD2:
variable results used below is nothing but schemaRDD from RDD1.
valgroupByRDD 
=results.groupBy('state)('state,(Alias(Max('amount2),"amount1")()))
valx =results.as  <http://results.as>('x)
valoriginalTableColumns = x.schema.fieldNames
valy = groupByRDD.as('y)
val  joinOnClause='x.state='y.state
valjoinRDD = x.join(y,LeftOuter,Some(joinOnClause))
Get the records from joinRDD.

Note:results(RDD1) is already created and cached..So the time from groupByRDD to joinRDD is around 8 to 10 secs.

Hari











On Wed, Dec 17, 2014 at 10:09 PM, Cheng Lian <lian.cs....@gmail.com <mailto:lian.cs....@gmail.com>> wrote:

    What kinds are the tables underlying the SchemaRDDs? Could you
    please provide the DDL of the tables and the query you executed?

    On 12/18/14 6:15 AM, harirajaram wrote:

        Guys,
        I'm trying to join 2-3 schemaRDD's for approx 30,000 rows and
        it is terribly
slow.No doubt I get the results but it takes 8s to do the join and get the
        results.
        I'm running on a standalone spark in my m/c having 8 cores and
        12gb RAM with
        4 workers.
        Not sure why it is consuming time,any inputs appreciated..

        This is just an e.g on what I'm trying to say.

        RDD1(30,000 rows)
        state,city,amount

        RDD2 (50 rows)
        state,amount1

        join by state
        New RDD3:(30,000 rows)
        state,city,amount,amount1

        Do a select(amount-amount1) from New RDD3.









        --
        View this message in context:
        
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751.html
        Sent from the Apache Spark User List mailing list archive at
        Nabble.com.

        ---------------------------------------------------------------------
        To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
        <mailto:user-unsubscr...@spark.apache.org>
        For additional commands, e-mail: user-h...@spark.apache.org
        <mailto:user-h...@spark.apache.org>



Reply via email to