Re: Spark SQL and Skewed Joins

2015-06-17 Thread Yin Huai
Hi John,

Did you also set spark.sql.planner.externalSort to true? Probably you will
not see executor lost with this conf. For now, maybe you can manually split
the query to two parts, one for skewed keys and one for other records.
Then, you union then results of these two parts together.

Thanks,

Yin

On Wed, Jun 17, 2015 at 9:53 AM, Koert Kuipers ko...@tresata.com wrote:

 could it be composed maybe? a general version and then a sql version that
 exploits the additional info/abilities available there and uses the general
 version internally...

 i assume the sql version can benefit from the logical phase optimization
 to pick join details. or is there more?

 On Tue, Jun 16, 2015 at 7:37 PM, Michael Armbrust mich...@databricks.com
 wrote:

 this would be a great addition to spark, and ideally it belongs in spark
 core not sql.


 I agree with the fact that this would be a great addition, but we would
 likely want a specialized SQL implementation for performance reasons.





Re: Spark SQL and Skewed Joins

2015-06-17 Thread Koert Kuipers
could it be composed maybe? a general version and then a sql version that
exploits the additional info/abilities available there and uses the general
version internally...

i assume the sql version can benefit from the logical phase optimization to
pick join details. or is there more?

On Tue, Jun 16, 2015 at 7:37 PM, Michael Armbrust mich...@databricks.com
wrote:

 this would be a great addition to spark, and ideally it belongs in spark
 core not sql.


 I agree with the fact that this would be a great addition, but we would
 likely want a specialized SQL implementation for performance reasons.



Re: Spark SQL and Skewed Joins

2015-06-16 Thread Koert Kuipers
a skew join (where the dominant key is spread across multiple executors) is
pretty standard in other frameworks, see for example in scalding:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala

this would be a great addition to spark, and ideally it belongs in spark
core not sql.

its also real a big data problem (single key is too large for executor),
which makes it a hard sell in my experience. the interest in truly big data
in spark community has been somewhat limited...

On Tue, Jun 16, 2015 at 11:28 AM, Jon Walton jon.w.wal...@gmail.com wrote:

 On Fri, Jun 12, 2015 at 9:43 PM, Michael Armbrust mich...@databricks.com
 wrote:

 2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to
 use 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
 available, would any of the JOIN enhancements help this situation?


 I would try Spark 1.4 after running SET
 spark.sql.planner.sortMergeJoin=true.  Please report back if this works
 for you.



 Hi Michael,

 This does help.  The joins are faster and fewer executors are lost, but it
 seems the same core problem still exists - that a single executor is
 handling the majority of the join (the skewed key) and bottlenecking the
 job.

 One idea I had was to split the dimension table into two halves - a small
 half which can be broadcast, (with the skewed keys), and the other large
 half which could be sort merge joined, (with even distribution), and then
 performing two individual queries against the large fact table and union
 the results.Does this sound like a worthwhile approach?

 Thank you,

 Jon




Re: Spark SQL and Skewed Joins

2015-06-16 Thread Jon Walton
On Fri, Jun 12, 2015 at 9:43 PM, Michael Armbrust mich...@databricks.com
wrote:

 2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
 available, would any of the JOIN enhancements help this situation?


 I would try Spark 1.4 after running SET
 spark.sql.planner.sortMergeJoin=true.  Please report back if this works
 for you.



Hi Michael,

This does help.  The joins are faster and fewer executors are lost, but it
seems the same core problem still exists - that a single executor is
handling the majority of the join (the skewed key) and bottlenecking the
job.

One idea I had was to split the dimension table into two halves - a small
half which can be broadcast, (with the skewed keys), and the other large
half which could be sort merge joined, (with even distribution), and then
performing two individual queries against the large fact table and union
the results.Does this sound like a worthwhile approach?

Thank you,

Jon


Re: Spark SQL and Skewed Joins

2015-06-16 Thread Michael Armbrust

 this would be a great addition to spark, and ideally it belongs in spark
 core not sql.


I agree with the fact that this would be a great addition, but we would
likely want a specialized SQL implementation for performance reasons.


Re: Spark SQL and Skewed Joins

2015-06-12 Thread Michael Armbrust

 2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
 1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
 available, would any of the JOIN enhancements help this situation?


I would try Spark 1.4 after running SET
spark.sql.planner.sortMergeJoin=true.  Please report back if this works
for you.


Spark SQL and Skewed Joins

2015-06-12 Thread Jon Walton
Greetings,

I am trying to implement a classic star schema ETL pipeline using Spark
SQL, 1.2.1.  I am running into problems with shuffle joins, for those
dimension tables which have skewed keys and are too large to let Spark
broadcast them.

I have a few questions

1. Can I split my queries so a unique, skewed key gets processed by by
multiple reducer steps?   I have tried this (using a UNION) but I am always
left with the 199/200 executors complete, which times out and even starts
throwing memory errors.   That single executor is processing 95% of the 80G
fact table for the single skewed key.

2. Does 1.3.2 or 1.4 have any enhancements that can help?   I tried to use
1.3.1 but SPARK-6967 prohibits me from doing so.Now that 1.4 is
available, would any of the JOIN enhancements help this situation?

3. Do you have suggestions for memory config if I wanted to broadcast 2G
dimension tables?   Is this even feasible?   Do table broadcasts wind up in
the heap or in dedicated storage space?

Thanks for your help,

Jon