[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-09-03 Thread YanjieGao
Github user YanjieGao closed the pull request at:

https://github.com/apache/spark/pull/1134


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-09-03 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-54283740
  
Hi marmbrus,I will close it. Best Regards


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-09-02 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-54251873
  
Hi @YanjieGao, as I said #1127 this will be a great optimization to have 
after we figure out how to choose join algorithms based on statistics.  I think 
we should close this issue for now and reopen once we have a design for this.

Thanks for working on it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48707013
  
Thanks Michael ,
(1) We could make it as a user hint ,like hive does .
set hive.optimize.skewjoin = true; 
set hive.skewjoin.key = skew_key_threshold (default = 10)
We could use  set sparksql.optimize.skewjoin=true
set sparksql.skewjoin.key=skew_key_threshold
(2)We could use sample to found the relative num of the key and though 
skew_key_threshold which is user set can judge which key is over the threshold
(3) toString will generate many singleton object .
,I will optimize the code in next step.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48707289
  
Hi , I also make a left semi join .I don't know is this join as a 
optimization as  the left semi join or as a single join algorithm. I think  the 
1127 PR also has some optimization need to do .Do you think this 1127 PR has it 
value to be merged ?Thanks a lot. 
https://github.com/apache/spark/pull/1127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14811559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
+val skewedTable = left.execute()
+//This will later write as configuration
+val sample = skewedTable.sample(false, 0.3, 9).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode()  
row2.hashCode())
--- End diff --

i want to use key to sort the row ,I think i need some better way to obtain 
the key .Do you have some better way to fetch the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48752406
  
Hi I rewrite the code ,and resolve some former problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678162
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala 
---
@@ -23,3 +23,4 @@ case object LeftOuter extends JoinType
 case object RightOuter extends JoinType
 case object FullOuter extends JoinType
 case object LeftSemi extends JoinType
+case object Skew extends JoinType 
--- End diff --

A key question for this PR is if we want to model skew as a join type.  My 
first inclination would be no, since it is a hint about how to execute the 
query for maximum performance and not something that changes the answer of the 
query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
--- End diff --

indent only 2 from `condition`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678179
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
--- End diff --

no space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678203
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
+val skewedTable = left.execute()
+//This will later write as configuration
+val sample = skewedTable.sample(false, 0.3, 9).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode()  
row2.hashCode())
+var max = 0
+var num = sample.size - 1
+var temp = 0
+var maxrowKey = sortedSample(0)
+//find the largest key
+if (sortedSample.size  1) {
+  for (i - 1 to num) {
+if (sortedSample(i - 1) == sortedSample(i)) {
+  temp += 1
+}
+else {
+  if (temp  max) {
+max = temp
+maxrowKey = sortedSample(i - 1)
+  }
+  temp = 0
+}
+  }
+}
+// I also Send a Pull Request as RDD API span fun to offer the 
function 
+//split a RDD as two through span function
+// .https://github.com/apache/spark/pull/1306
+//val (maxKeySkewedTable, mainSkewedTable) = skewedTable.span(row 
= {
+//  
skewSideKeyGenerator(row).toString().equals(maxrowKey.toString())
+//})
+val maxKeySkewedTable = skewedTable.filter(row = {
+  row.toString().equals(maxrowKey.toString())
--- End diff --

Why are you comparing the strings of the rows instead of the rows 
themselves?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678448
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
+val skewedTable = left.execute()
+//This will later write as configuration
+val sample = skewedTable.sample(false, 0.3, 9).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode()  
row2.hashCode())
+var max = 0
+var num = sample.size - 1
+var temp = 0
+var maxrowKey = sortedSample(0)
+//find the largest key
--- End diff --

Space after //.  Use capital letters to start sentences and end comments 
with period (.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14678486
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
+val skewedTable = left.execute()
+//This will later write as configuration
+val sample = skewedTable.sample(false, 0.3, 9).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode()  
row2.hashCode())
--- End diff --

Why are you sorting by hashCode?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-08 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48396637
  
I think there are major questions that will need to be answered before we 
could merge this PR:
 - Is skew just a hint instead of a join type and how do we propagate that 
information through?
 - @chenghao-intel asks a valid question about join keys.  I'm not sure how 
this could work without them.
 - I think the current implementation of execute() is going to suffer from 
serious performance issues.  It does many passes over the data, does a lot of 
unnecessary string manipulation and computes several Cartesian products.  You 
will need to run some performance experiments with large datasets in order to 
show that this operator actually has benefits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-06 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48143209
  
Hi all, I rewrite most of the code and the testsuite can pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-03 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-47870392
  
Hi all. I have resolve the conflict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-06-25 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-47173420
  
Hi All,I update 8 files like the pull add EXCEPT operator .But when i exec 
the test ,it exec case class CartesianProduct operator.I think there are some 
mistakes in my code .Can you help me? Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-06-24 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-46938049
  
Thanks a lot ,Chenghao . This code like a demo  ,i think we could through 
improve sample phrase and use some strategy to judge the which key set  are 
skew keys. we can through absolute rate or  relative rate .What's your 
suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-06-23 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r1431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -341,3 +342,85 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches))
   }
 }
+
+
+
+
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The streamed rdd will be 
made  as mainstreamedtable
+ * rdd without the largest key and the maxkeystreamedtable rdd with the 
largest key.Then,join the two table  with the buildtable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoin(
+   leftKeys: Seq[Expression],
+   rightKeys: Seq[Expression],
+   buildSide: BuildSide,
+   left: SparkPlan,
+   right: SparkPlan,
+   @transient sc: SparkContext) extends BinaryNode 
{
+override def outputPartitioning: Partitioning = left.outputPartitioning
+
+override def requiredChildDistribution =
+ClusteredDistribution(leftKeys) :: 
ClusteredDistribution(rightKeys) :: Nil
+
+val (buildPlan, streamedPlan) = buildSide match {
+case BuildLeft = (left, right)
+case BuildRight = (right, left)
+}
+val (buildKeys, streamedKeys) = buildSide match {
+case BuildLeft = (leftKeys, rightKeys)
+case BuildRight = (rightKeys, leftKeys)
+}
+
+def output = left.output ++ right.output
+
+@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, 
buildPlan.output)
+@transient lazy val streamSideKeyGenerator = new 
Projection(streamedKeys, streamedPlan.output)
+
+
+def execute() = {
+val streamedTable = streamedPlan.execute()
+//This will later write as configuration
+val sample = streamedTable.sample(false, 0.3, 9).map(row = 
streamSideKeyGenerator(row)).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode() 
 row2.hashCode())
+var max = 0
+var num = sample.size - 1
+var temp = 0
+var maxrowKey = sortedSample(0)
+//find the largest key
+if (sortedSample.size  1) {
+for (i - 1 to num) {
+if (sortedSample(i - 1) == sortedSample(i)) temp += 1
+else {
+if (temp  max) {
+max = temp
+maxrowKey = sortedSample(i - 1)
+}
+temp = 0
+}
+}
+}
+val maxKeyStreamedTable = streamedTable.filter(row = {
--- End diff --

This can be done like 
```
val (maxKeyStreamedTable, mainStreamedTable) = streamedTable.partition(row 
= {
  streamSideKeyGenerator(row).toString().equals(maxrowKey.toString())
})
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-06-23 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r1473
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -341,3 +342,85 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches))
   }
 }
+
+
+
+
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The streamed rdd will be 
made  as mainstreamedtable
+ * rdd without the largest key and the maxkeystreamedtable rdd with the 
largest key.Then,join the two table  with the buildtable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoin(
+   leftKeys: Seq[Expression],
+   rightKeys: Seq[Expression],
+   buildSide: BuildSide,
+   left: SparkPlan,
+   right: SparkPlan,
+   @transient sc: SparkContext) extends BinaryNode 
{
+override def outputPartitioning: Partitioning = left.outputPartitioning
+
+override def requiredChildDistribution =
+ClusteredDistribution(leftKeys) :: 
ClusteredDistribution(rightKeys) :: Nil
+
+val (buildPlan, streamedPlan) = buildSide match {
+case BuildLeft = (left, right)
+case BuildRight = (right, left)
+}
+val (buildKeys, streamedKeys) = buildSide match {
+case BuildLeft = (leftKeys, rightKeys)
+case BuildRight = (rightKeys, leftKeys)
+}
+
+def output = left.output ++ right.output
+
+@transient lazy val buildSideKeyGenerator = new Projection(buildKeys, 
buildPlan.output)
+@transient lazy val streamSideKeyGenerator = new 
Projection(streamedKeys, streamedPlan.output)
+
+
+def execute() = {
+val streamedTable = streamedPlan.execute()
+//This will later write as configuration
+val sample = streamedTable.sample(false, 0.3, 9).map(row = 
streamSideKeyGenerator(row)).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode() 
 row2.hashCode())
+var max = 0
+var num = sample.size - 1
+var temp = 0
+var maxrowKey = sortedSample(0)
+//find the largest key
+if (sortedSample.size  1) {
+for (i - 1 to num) {
+if (sortedSample(i - 1) == sortedSample(i)) temp += 1
--- End diff --

should use {} for `temp += 1` in a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-06-23 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-46928292
  
Skew join optimization will be very helpful, but how do we know what are 
skew join keys? By the way, the code will not take effect if you don't put the 
strategy object into a give context, (e.g. SQLContext, HiveContext)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---