Additionally, I'm curious if there are any JIRAS around making dataframes support ordering better? there are a lot of operations that can be optimized if you know that you have a total ordering on your data...are there any plans, or at least JIRAS, around having the catalyst optimizer handle this case?
2015-11-02 9:39 GMT-05:00 Alex Nastetsky <alex.nastet...@vervemobile.com>: > Thanks for the response. > > Taking the file system based data source as “UnknownPartitioning”, will > be a simple and SAFE way for JOIN, as it’s hard to guarantee the records > from different data sets with the identical join keys will be loaded by the > same node/task , since lots of factors need to be considered, like task > pool size, cluster size, source format, storage, data locality etc.,. > > I’ll agree it’s worth to optimize it for performance concerns, and > actually in Hive, it is called bucket join. I am not sure will that happens > soon in Spark SQL. > > > Yes, this is supported in > > - Hive with bucket join > - Pig with USING "merge" > <https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins> > - MR with CompositeInputFormat > > But I guess it's not supported in Spark? > > On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao <hao.ch...@intel.com> wrote: > >> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For >> example, in the code below, the two datasets have different number of >> partitions, but it still does a SortMerge join after a "hashpartitioning". >> >> >> >> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join) >> requires the records with the identical join keys MUST BE shuffled to the >> same “reducer” node / task, hashpartitioning is just a strategy to tell >> spark shuffle service how to achieve that, in theory, we even can use the >> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t >> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much >> about the shuffle strategy so much if it satisfies the demand on data >> distribution. >> >> >> >> 2) If both datasets have already been previously partitioned/sorted the >> same and stored on the file system (e.g. in a previous job), is there a way >> to tell Spark this so that it won't want to do a "hashpartitioning" on >> them? It looks like Spark just considers datasets that have been just read >> from the the file system to have UnknownPartitioning. In the example below, >> I try to join a dataframe to itself, and it still wants to hash repartition. >> >> >> >> [Hao:] Take this as example: >> >> >> >> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON >> a.key=b.key JOIN src c ON b.key=c.key >> >> >> >> == Physical Plan == >> >> TungstenProject [value#20,value#22,value#24] >> >> SortMergeJoin [key#21], [key#23] >> >> TungstenSort [key#21 ASC], false, 0 >> >> TungstenProject [key#21,value#22,value#20] >> >> SortMergeJoin [key#19], [key#21] >> >> TungstenSort [key#19 ASC], false, 0 >> >> TungstenExchange hashpartitioning(key#19,200) >> >> ConvertToUnsafe >> >> HiveTableScan [key#19,value#20], (MetastoreRelation default, src, >> Some(a)) >> >> TungstenSort [key#21 ASC], false, 0 >> >> TungstenExchange hashpartitioning(key#21,200) >> >> ConvertToUnsafe >> >> HiveTableScan [key#21,value#22], (MetastoreRelation default, src, >> Some(b)) >> >> TungstenSort [key#23 ASC], false, 0 >> >> TungstenExchange hashpartitioning(key#23,200) >> >> ConvertToUnsafe >> >> HiveTableScan [key#23,value#24], (MetastoreRelation default, src, >> Some(c)) >> >> >> >> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN >> src b ON a.key=b.key”, as we didn’t change the data distribution after >> it, so we can join another table “JOIN src c ON b.key=c.key” directly, >> which only require the table “c” for repartitioning on “key”. >> >> >> >> Taking the file system based data source as “UnknownPartitioning”, will >> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records >> from different data sets with the identical join keys will be loaded by the >> same node/task , since lots of factors need to be considered, like task >> pool size, cluster size, source format, storage, data locality etc.,. >> >> I’ll agree it’s worth to optimize it for performance concerns, and >> actually in Hive, it is called bucket join. I am not sure will that happens >> soon in Spark SQL. >> >> >> >> Hao >> >> >> >> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com] >> *Sent:* Monday, November 2, 2015 11:29 AM >> *To:* user >> *Subject:* Sort Merge Join >> >> >> >> Hi, >> >> >> >> I'm trying to understand SortMergeJoin (SPARK-2213). >> >> >> >> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For >> example, in the code below, the two datasets have different number of >> partitions, but it still does a SortMerge join after a "hashpartitioning". >> >> >> >> CODE: >> >> val sparkConf = new SparkConf() >> >> .setAppName("SortMergeJoinTest") >> >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> >> .set("spark.eventLog.enabled", "true") >> >> .set("spark.sql.planner.sortMergeJoin","true") >> >> >> >> sparkConf.setMaster("local-cluster[3,1,1024]") >> >> >> >> val sc = new SparkContext(sparkConf) >> >> val sqlContext = new SQLContext(sc) >> >> import sqlContext.implicits._ >> >> >> >> val inputpath = input.gz.parquet >> >> >> >> val df1 = sqlContext.read.parquet(inputpath).repartition(3) >> >> val df2 = sqlContext.read.parquet(inputpath).repartition(5) >> >> val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" === >> $"foo2") >> >> result.explain() >> >> >> >> OUTPUT: >> >> == Physical Plan == >> >> SortMergeJoin [foo#0], [foo2#8] >> >> TungstenSort [foo#0 ASC], false, 0 >> >> TungstenExchange hashpartitioning(foo#0) >> >> ConvertToUnsafe >> >> Repartition 3, true >> >> Scan >> ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] >> >> TungstenSort [foo2#8 ASC], false, 0 >> >> TungstenExchange hashpartitioning(foo2#8) >> >> TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7] >> >> Repartition 5, true >> >> Scan >> ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7] >> >> >> >> 2) If both datasets have already been previously partitioned/sorted the >> same and stored on the file system (e.g. in a previous job), is there a way >> to tell Spark this so that it won't want to do a "hashpartitioning" on >> them? It looks like Spark just considers datasets that have been just read >> from the the file system to have UnknownPartitioning. In the example below, >> I try to join a dataframe to itself, and it still wants to hash repartition. >> >> >> >> CODE: >> >> ... >> >> val df1 = sqlContext.read.parquet(inputpath) >> >> val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" === >> $"foo2") >> >> result.explain() >> >> >> >> OUTPUT: >> >> == Physical Plan == >> >> SortMergeJoin [foo#0], [foo2#4] >> >> TungstenSort [foo#0 ASC], false, 0 >> >> TungstenExchange hashpartitioning(foo#0) >> >> ConvertToUnsafe >> >> Scan >> ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3] >> >> TungstenSort [foo2#4 ASC], false, 0 >> >> TungstenExchange hashpartitioning(foo2#4) >> >> ConvertToUnsafe >> >> Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8] >> >> Scan >> ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8] >> >> >> >> >> >> Thanks. >> > >