Re: Sort Merge Join

2015-11-02 Thread Alex Nastetsky
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be
a simple and SAFE way for JOIN, as it’s hard to guarantee the records from
different data sets with the identical join keys will be loaded by the same
node/task , since lots of factors need to be considered, like task pool
size, cluster size, source format, storage, data locality etc.,.

I’ll agree it’s worth to optimize it for performance concerns, and actually
in Hive, it is called bucket join. I am not sure will that happens soon in
Spark SQL.


Yes, this is supported in

   - Hive with bucket join
   - Pig with USING "merge"
   
   - MR with CompositeInputFormat

But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao  wrote:

> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
> requires the records with the identical join keys MUST BE shuffled to the
> same “reducer” node / task, hashpartitioning is just a strategy to tell
> spark shuffle service how to achieve that, in theory, we even can use the
> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
> about the shuffle strategy so much if it satisfies the demand on data
> distribution.
>
>
>
> 2) If both datasets have already been previously partitioned/sorted the
> same and stored on the file system (e.g. in a previous job), is there a way
> to tell Spark this so that it won't want to do a "hashpartitioning" on
> them? It looks like Spark just considers datasets that have been just read
> from the the file system to have UnknownPartitioning. In the example below,
> I try to join a dataframe to itself, and it still wants to hash repartition.
>
>
>
> [Hao:] Take this as example:
>
>
>
> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
> a.key=b.key JOIN src c ON b.key=c.key
>
>
>
> == Physical Plan ==
>
> TungstenProject [value#20,value#22,value#24]
>
> SortMergeJoin [key#21], [key#23]
>
>   TungstenSort [key#21 ASC], false, 0
>
>TungstenProject [key#21,value#22,value#20]
>
> SortMergeJoin [key#19], [key#21]
>
>  TungstenSort [key#19 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#19,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
> Some(a))
>
>  TungstenSort [key#21 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#21,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
> Some(b))
>
>   TungstenSort [key#23 ASC], false, 0
>
>TungstenExchange hashpartitioning(key#23,200)
>
> ConvertToUnsafe
>
>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
> Some(c))
>
>
>
> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
> src b ON a.key=b.key”, as we didn’t change the data distribution after
> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
> which only require the table “c” for repartitioning on “key”.
>
>
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
>
> Hao
>
>
>
> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
> *Sent:* Monday, November 2, 2015 11:29 AM
> *To:* user
> *Subject:* Sort Merge Join
>
>
>
> Hi,
>
>
>
> I'm trying to understand SortMergeJoin (SPARK-2213).
>
>
>
> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> CODE:
>
>val sparkConf = new SparkConf()
>
>   .setAppName("SortMergeJoinTest")
>
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>   .set("spark.eventLog.enabled", "true")
>
>   .set("spark.sql.planner.sortMergeJoin","true")
>
>
>
> sparkConf.setMaster("local-cluster[3,1,1024]")
>
>
>
> val sc = new SparkContext(sparkConf)
>
> val sqlContext = new SQLContext(sc)
>
> import sqlContext.implicits._
>
>
>
> val inputpath = input.gz.parquet
>
>
>
> val df1 = 

Re: Sort Merge Join

2015-11-02 Thread Jonathan Coveney
Additionally, I'm curious if there are any JIRAS around making dataframes
support ordering better? there are a lot of operations that can be
optimized if you know that you have a total ordering on your data...are
there any plans, or at least JIRAS, around having the catalyst optimizer
handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky :

> Thanks for the response.
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
> Yes, this is supported in
>
>- Hive with bucket join
>- Pig with USING "merge"
>
>- MR with CompositeInputFormat
>
> But I guess it's not supported in Spark?
>
> On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao  wrote:
>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
>> requires the records with the identical join keys MUST BE shuffled to the
>> same “reducer” node / task, hashpartitioning is just a strategy to tell
>> spark shuffle service how to achieve that, in theory, we even can use the
>> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
>> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
>> about the shuffle strategy so much if it satisfies the demand on data
>> distribution.
>>
>>
>>
>> 2) If both datasets have already been previously partitioned/sorted the
>> same and stored on the file system (e.g. in a previous job), is there a way
>> to tell Spark this so that it won't want to do a "hashpartitioning" on
>> them? It looks like Spark just considers datasets that have been just read
>> from the the file system to have UnknownPartitioning. In the example below,
>> I try to join a dataframe to itself, and it still wants to hash repartition.
>>
>>
>>
>> [Hao:] Take this as example:
>>
>>
>>
>> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
>> a.key=b.key JOIN src c ON b.key=c.key
>>
>>
>>
>> == Physical Plan ==
>>
>> TungstenProject [value#20,value#22,value#24]
>>
>> SortMergeJoin [key#21], [key#23]
>>
>>   TungstenSort [key#21 ASC], false, 0
>>
>>TungstenProject [key#21,value#22,value#20]
>>
>> SortMergeJoin [key#19], [key#21]
>>
>>  TungstenSort [key#19 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#19,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
>> Some(a))
>>
>>  TungstenSort [key#21 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#21,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
>> Some(b))
>>
>>   TungstenSort [key#23 ASC], false, 0
>>
>>TungstenExchange hashpartitioning(key#23,200)
>>
>> ConvertToUnsafe
>>
>>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
>> Some(c))
>>
>>
>>
>> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
>> src b ON a.key=b.key”, as we didn’t change the data distribution after
>> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
>> which only require the table “c” for repartitioning on “key”.
>>
>>
>>
>> Taking the file system based data source as “UnknownPartitioning”, will
>> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
>> from different data sets with the identical join keys will be loaded by the
>> same node/task , since lots of factors need to be considered, like task
>> pool size, cluster size, source format, storage, data locality etc.,.
>>
>> I’ll agree it’s worth to optimize it for performance concerns, and
>> actually in Hive, it is called bucket join. I am not sure will that happens
>> soon in Spark SQL.
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
>> *Sent:* Monday, November 2, 2015 11:29 AM
>> *To:* user
>> *Subject:* Sort Merge Join
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm trying to understand SortMergeJoin (SPARK-2213).
>>
>>
>>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> CODE:
>>
>>  

RE: Sort Merge Join

2015-11-02 Thread Cheng, Hao
No as far as I can tell, @Michael @YinHuai @Reynold , any comments on this 
optimization?

From: Jonathan Coveney [mailto:jcove...@gmail.com]
Sent: Tuesday, November 3, 2015 4:17 AM
To: Alex Nastetsky
Cc: Cheng, Hao; user
Subject: Re: Sort Merge Join

Additionally, I'm curious if there are any JIRAS around making dataframes 
support ordering better? there are a lot of operations that can be optimized if 
you know that you have a total ordering on your data...are there any plans, or 
at least JIRAS, around having the catalyst optimizer handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky 
<alex.nastet...@vervemobile.com<mailto:alex.nastet...@vervemobile.com>>:
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Yes, this is supported in

  *   Hive with bucket join
  *   Pig with USING 
"merge"<https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins>
  *   MR with CompositeInputFormat
But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao 
<hao.ch...@intel.com<mailto:hao.ch...@intel.com>> wrote:
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky 
[mailto:alex.nastet...@vervemobile.com<mailto:alex.nastet...@vervemobile.com>]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it sti

RE: Sort Merge Join

2015-11-01 Thread Cheng, Hao
1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

[Hao:] A distributed JOIN operation (either HashBased or SortBased Join) 
requires the records with the identical join keys MUST BE shuffled to the same 
“reducer” node / task, hashpartitioning is just a strategy to tell spark 
shuffle service how to achieve that, in theory, we even can use the 
`RangePartitioning` instead (but it’s less efficient, that’s why we don’t 
choose it for JOIN). So conceptually the JOIN operator doesn’t care so much 
about the shuffle strategy so much if it satisfies the demand on data 
distribution.

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file 
system to have UnknownPartitioning. In the example below, I try to join a 
dataframe to itself, and it still wants to hash repartition.

[Hao:] Take this as example:

EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON a.key=b.key 
JOIN src c ON b.key=c.key

== Physical Plan ==
TungstenProject [value#20,value#22,value#24]
SortMergeJoin [key#21], [key#23]
  TungstenSort [key#21 ASC], false, 0
   TungstenProject [key#21,value#22,value#20]
SortMergeJoin [key#19], [key#21]
 TungstenSort [key#19 ASC], false, 0
  TungstenExchange hashpartitioning(key#19,200)
   ConvertToUnsafe
HiveTableScan [key#19,value#20], (MetastoreRelation default, src, 
Some(a))
 TungstenSort [key#21 ASC], false, 0
  TungstenExchange hashpartitioning(key#21,200)
   ConvertToUnsafe
HiveTableScan [key#21,value#22], (MetastoreRelation default, src, 
Some(b))
  TungstenSort [key#23 ASC], false, 0
   TungstenExchange hashpartitioning(key#23,200)
ConvertToUnsafe
 HiveTableScan [key#23,value#24], (MetastoreRelation default, src, Some(c))

There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN src b 
ON a.key=b.key”, as we didn’t change the data distribution after it, so we can 
join another table “JOIN src c ON b.key=c.key” directly, which only require the 
table “c” for repartitioning on “key”.

Taking the file system based data source as “UnknownPartitioning”, will be a 
simple and SAFE way for JOIN, as it’s hard to guarantee the records from 
different data sets with the identical join keys will be loaded by the same 
node/task , since lots of factors need to be considered, like task pool size, 
cluster size, source format, storage, data locality etc.,.
I’ll agree it’s worth to optimize it for performance concerns, and actually in 
Hive, it is called bucket join. I am not sure will that happens soon in Spark 
SQL.

Hao

From: Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
Sent: Monday, November 2, 2015 11:29 AM
To: user
Subject: Sort Merge Join

Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For 
example, in the code below, the two datasets have different number of 
partitions, but it still does a SortMerge join after a "hashpartitioning".

CODE:
   val sparkConf = new SparkConf()
  .setAppName("SortMergeJoinTest")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.eventLog.enabled", "true")
  .set("spark.sql.planner.sortMergeJoin","true")

sparkConf.setMaster("local-cluster[3,1,1024]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val inputpath = input.gz.parquet

val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === 
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Repartition 3, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#8)
  TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan 
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the same 
and stored on the file system (e.g. in a previous job), is there a way to tell 
Spark this so that it won't want to do a "hashpartitioning" on them? It looks 
like Spark just considers datasets that have been just read from the the file