I used 1.5.2.I have used movies data to reproduce the issue. Below is the
physical plan. I am not sure why it is hash partitioning the data and then
sort and then join. I expect the data to be joined first and then send for
further processing.

I sort of expect a common partitioner which will work on say a column and
will partition the dataframe on a given column in say given number of
buckets and try to keep this data as close as possible physically as well
i.e. colocated and if it sees the 2 tables with same partition columns then
try to join them at partition level if the partition column is part of join
condition and then shuffle data for further processing.

Below are queries

//read movies from parquet files.
val movies_1 = sqlContext.sql("select * from movies")
val movies_2 = sqlContext.sql("select * from movies")

val part_movies_1 = sqlContext.createDataFrame(
  movies_1.rdd.map(r => (r.getInt(0),
r)).partitionBy(my_partitioner).values,
  movies_1.schema
)
val part_movies_2 = sqlContext.createDataFrame(
  movies_2.rdd.map(r => (r.getInt(0),
r)).partitionBy(my_partitioner).values,
  movies_2.schema
)

part_movies_1.persist()
part_movies_2.persist()
part_movies_1.registerTempTable("part_movies_1")
part_movies_2.registerTempTable("part_movies_2")
//look at storage in sparkUI

val sql1 = sqlContext.sql("select * from part_movies_1 pm1 inner join
part_movies_2 pm2 on pm1.movie=pm2.movie and pm1.title = pm2.title")

sql1.count() ///plan is for this count statement.

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[count#750L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#783L])
   TungstenProject
    SortMergeJoin [movie#622,title#623], [movie#640,title#641]
     TungstenSort [movie#622 ASC,title#623 ASC], false, 0
      TungstenExchange hashpartitioning(movie#622,title#623)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#622,title#623],
(InMemoryRelation [movie#622,title#623,genres#624], true, 10000,
StorageLevel(true, true, false, true, 1), (Scan
PhysicalRDD[movie#622,title#623,genres#624]), None)
     TungstenSort [movie#640 ASC,title#641 ASC], false, 0
      TungstenExchange hashpartitioning(movie#640,title#641)
       ConvertToUnsafe
        InMemoryColumnarTableScan [movie#640,title#641],
(InMemoryRelation [movie#640,title#641,genres#642], true, 10000,
StorageLevel(true, true, false, true, 1), (Scan
PhysicalRDD[movie#640,title#641,genres#642]), None)

Please let me know if you need further information.


On Tue, Apr 5, 2016 at 6:33 PM, Yong Zhang <java8...@hotmail.com> wrote:

> You need to show us the execution plan, so we can understand what is your
> issue.
>
> Use the spark shell code to show how your DF is built, how you partition
> them, then use explain(true) on your join DF, and show the output here, so
> we can better help you.
>
> Yong
>
> > Date: Tue, 5 Apr 2016 09:46:59 -0700
> > From: darshan.m...@gmail.com
> > To: user@spark.apache.org
> > Subject: Plan issue with spark 1.5.2
> >
> >
> > I am using spark 1.5.2. I have a question regarding plan generated by
> spark.
> > I have 3 data-frames which has the data for different countries. I have
> > around 150 countries and data is skewed.
> >
> > My 95% queries will have country as criteria. However, I have seen issues
> > with the plans generated for queries which has country as join column.
> >
> > Data-frames are partitioned based on the country.Not only these
> dataframes
> > are co-partitioned, these are co-located as well. E.g. Data for UK in
> > data-frame df1, df2 df3 will be at on same hdfs datanode.
> >
> > Then when i join these 3 tables and country is one of the join column. I
> > assume that the join should be the map side join but it shuffles the data
> > from 3 dataframes and then join using shuffled data. Apart from country
> > there are other columns in join.
> >
> > Is this correct behavior? If it is an issue is it fixed in latest
> versions?
> >
> > Thanks
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>

Reply via email to