[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-03-18 Thread zsxwing
Github user zsxwing closed the pull request at:

https://github.com/apache/spark/pull/3505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-03-18 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-83306062
  
OK. Closing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-03-17 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-82703356
  
Let's close this pull request for now. I think we should revisit this topic 
in the future to implement sort-based join and aggregation for DataFrames and 
SQL. We can reuse some of the code from here then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-02-19 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-75148021
  
@rxin @zsxwing what is the status of this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22775200
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleHandle
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection._
+
+private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] 
extends Serializable {
+
+  def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): 
Iterator[(K, PAIR)]
+
+}
+
+private[spark] object JoinType {
+
+  def inner[K, L, R] = new JoinType[K, L, R, (L, R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2))
+  }
+}
+  }
+  }
+
+  def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (v, None)): (K, (L, Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, 
Some(p2)))
+  }
+}
+  }
+  }
+
+  def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, v)): (K, (Option[L], R))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
p2))
+  }
+}
+  }
+  }
+
+  def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, Some(v))): (K, (Option[L], Option[R]))
+  }
+  else if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (Some(v), None)): (K, (Option[L], Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
Some(p2)))
+  }
+}
+  }
+  }
+
+  private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]](
+  outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, 
toPair: (OUT, IN) => PAIR) =
+for (
+  outerChunk <- outer.iterator;
+  innerChunk <- inn

[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22774569
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleHandle
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection._
+
+private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] 
extends Serializable {
+
+  def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): 
Iterator[(K, PAIR)]
+
+}
+
+private[spark] object JoinType {
+
+  def inner[K, L, R] = new JoinType[K, L, R, (L, R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2))
+  }
+}
+  }
+  }
+
+  def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (v, None)): (K, (L, Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, 
Some(p2)))
+  }
+}
+  }
+  }
+
+  def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, v)): (K, (Option[L], R))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
p2))
+  }
+}
+  }
+  }
+
+  def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, Some(v))): (K, (Option[L], Option[R]))
+  }
+  else if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (Some(v), None)): (K, (Option[L], Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
Some(p2)))
+  }
+}
+  }
+  }
+
+  private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]](
+  outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, 
toPair: (OUT, IN) => PAIR) =
+for (
+  outerChunk <- outer.iterator;
+  innerChunk <- inner.i

[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22774231
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleHandle
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection._
+
+private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] 
extends Serializable {
+
+  def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): 
Iterator[(K, PAIR)]
+
+}
+
+private[spark] object JoinType {
+
+  def inner[K, L, R] = new JoinType[K, L, R, (L, R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2))
+  }
+}
+  }
+  }
+
+  def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (v, None)): (K, (L, Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, 
Some(p2)))
+  }
+}
+  }
+  }
+
+  def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, v)): (K, (Option[L], R))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
p2))
+  }
+}
+  }
+  }
+
+  def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, Some(v))): (K, (Option[L], Option[R]))
+  }
+  else if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (Some(v), None)): (K, (Option[L], Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
Some(p2)))
+  }
+}
+  }
+  }
+
+  private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]](
+  outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, 
toPair: (OUT, IN) => PAIR) =
+for (
+  outerChunk <- outer.iterator;
+  innerChunk <- inn

[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22774069
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -487,6 +487,196 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
+   * ::Experimental::
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, 
v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   *
+   * It supports to join skewed data. If values of some key cannot be fit 
into memory, it will spill
+   * them to disk.
+   */
+  @Experimental
+  def skewedJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
--- End diff --

See the discuss above. I added `spark.join.skew=true` at first but removed 
it as per @rxin and @sryza 's suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22774042
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleHandle
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection._
+
+private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] 
extends Serializable {
+
+  def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): 
Iterator[(K, PAIR)]
+
+}
+
+private[spark] object JoinType {
+
+  def inner[K, L, R] = new JoinType[K, L, R, (L, R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2))
+  }
+}
+  }
+  }
+
+  def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (v, None)): (K, (L, Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, 
Some(p2)))
+  }
+}
+  }
+  }
+
+  def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, v)): (K, (Option[L], R))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
p2))
+  }
+}
+  }
+  }
+
+  def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, Some(v))): (K, (Option[L], Option[R]))
+  }
+  else if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (Some(v), None)): (K, (Option[L], Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
Some(p2)))
+  }
+}
+  }
+  }
+
+  private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]](
+  outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, 
toPair: (OUT, IN) => PAIR) =
+for (
+  outerChunk <- outer.iterator;
+  innerChunk <- inner.i

[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22773920
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -487,6 +487,196 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
+   * ::Experimental::
+   * Return an RDD containing all pairs of elements with matching keys in 
`this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, 
v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   *
+   * It supports to join skewed data. If values of some key cannot be fit 
into memory, it will spill
+   * them to disk.
+   */
+  @Experimental
+  def skewedJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
--- End diff --

how about we just use previous join api. if we want to use skewedJoin, we 
can set spark.join.skew=true. Running application based on common join api can 
use skewedJoin without any changes.@zsxwing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2015-01-05 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r22460587
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.ShuffleHandle
+import org.apache.spark.util.Utils
+import org.apache.spark.util.collection._
+
+private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] 
extends Serializable {
+
+  def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): 
Iterator[(K, PAIR)]
+
+}
+
+private[spark] object JoinType {
+
+  def inner[K, L, R] = new JoinType[K, L, R, (L, R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2))
+  }
+}
+  }
+  }
+
+  def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (v, None)): (K, (L, Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, 
Some(p2)))
+  }
+}
+  }
+  }
+
+  def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, v)): (K, (Option[L], R))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
p2))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
p2))
+  }
+}
+  }
+  }
+
+  def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] {
+
+override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], 
Iterable[Chunk[R]]))]) =
+  i flatMap {
+case (key, pair) => {
+  if (pair._1.size == 0) {
+for (chunk <- pair._2.iterator;
+ v <- chunk
+) yield (key, (None, Some(v))): (K, (Option[L], Option[R]))
+  }
+  else if (pair._2.size == 0) {
+for (chunk <- pair._1.iterator;
+ v <- chunk
+) yield (key, (Some(v), None)): (K, (Option[L], Option[R]))
+  }
+  else if (pair._1.size < pair._2.size) {
+yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), 
Some(p2)))
+  } else {
+yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), 
Some(p2)))
+  }
+}
+  }
+  }
+
+  private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]](
+  outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, 
toPair: (OUT, IN) => PAIR) =
+for (
+  outerChunk <- outer.iterator;
+  innerChunk <- inn

[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66603180
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24361/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66603178
  
  [Test build #24361 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24361/consoleFull)
 for   PR 3505 at commit 
[`af7eb71`](https://github.com/apache/spark/commit/af7eb714ab9916a628859682b8cbf9c4c2396029).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ChunkBuffer[T: ClassTag](parameters: ChunkParameters)`
  * `class ExternalOrderingAppendOnlyMap[K, V, C](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-11 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66594423
  
I removed my changes of the `join` methods. Now it only adds new 
`skewedJoin` methods, and users need to call them explicitly.

> Other DSLs on top of Spark core like Pig, Hive, and Scalding.

A great point I never thought.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66593703
  
  [Test build #24361 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24361/consoleFull)
 for   PR 3505 at commit 
[`af7eb71`](https://github.com/apache/spark/commit/af7eb714ab9916a628859682b8cbf9c4c2396029).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-10 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66564746
  
+1 to different method names vs. immutable confs.  To throw in my two 
cents, I do think it's worthwhile to add this at a Spark core level.  It 
probably doesn't make sense to add logic to Spark core's join transformation to 
figure out the optimal join strategy.  But exposing different join 
transformations that implement different strategies is useful in a couple cases
* Advanced users that deeply understand their access patterns and can 
decide which of these to use directly.
* Other DSLs on top of Spark core like Pig, Hive, and Scalding.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-09 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66411889
  
Let me explain - Spark SQL is more than SQL. It is SQL + a dsl that will be 
improved over time. I personally believe over time, majority of Spark users 
will interact directly with SchemaRDD instead, because that is optimized for 
structured data. It is much easier in Spark SQL to also optimize for data 
structure. Personally I'm worried about pushing more and more stuff into Spark 
core itself because it is very hard to maintain and optimize for arbitrarily 
structured JVM objects.

Most of the code you have written here can be used directly in SchemaRDD. 
If we really want to apply this in core itself, even without considering the 
maintenance burden, we will need to find a way so this can be turned on and off 
easily (e.g. different method names) rather than relying on immutable confs.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66411639
  
> the problem is that sparkconf is immutable once created - so in order to 
toggle this on and off, a user would have to restart Spark.

I added this configuration because it's convenient to old codes. I would 
encourage people to call `skewedJoin` directly for new codes if they need 
skewed join because they are usually familiar with their data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66411392
  
> Maybe a better place to do this is in SparkSQL?

It depends on if this is a fundamental feature for Spark Core. IMO, I think 
it's better to have a skewed join in Spark Core as Join is a very common 
operator for Spark Core users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-09 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66410973
  
the problem is that sparkconf is immutable once created - so in order to 
toggle this on and off, a user would have to restart Spark. Maybe a better 
place to do this is in SparkSQL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-09 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-66410552
  
ping @rxin


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-12-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3505#discussion_r21147642
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
---
@@ -481,9 +481,203 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (k, v2) is in `other`. Uses the given Partitioner to partition the 
output RDD.
*/
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, 
W))] = {
--- End diff --

A better signature of `join` is `def join[W: ClassTag](other: RDD[(K, W)], 
partitioner: Partitioner): RDD[(K, (V, W))]`. However, it's a breaking change, 
so I don't change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-11-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-64868013
  
  [Test build #23932 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23932/consoleFull)
 for   PR 3505 at commit 
[`80d68a2`](https://github.com/apache/spark/commit/80d68a2ed9979fbfd84f99a3bc34beb51ff449cd).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ChunkBuffer[T: ClassTag](parameters: ChunkParameters)`
  * `class ExternalOrderingAppendOnlyMap[K, V, C](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-11-28 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-64868018
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23932/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-11-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3505#issuecomment-64861854
  
  [Test build #23932 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23932/consoleFull)
 for   PR 3505 at commit 
[`80d68a2`](https://github.com/apache/spark/commit/80d68a2ed9979fbfd84f99a3bc34beb51ff449cd).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join

2014-11-27 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/3505

[SPARK-4644][Core] Implement skewed join

Skewed data is not rare. For example, a book recommendation site may have 
several books which are liked by most of the users. Running ALS on such skewed 
data will raise a OutOfMemory error, if some book has too many users which 
cannot be fit into memory. To solve it, we propose a skewed join 
implementation. [Design 
Doc](https://issues.apache.org/jira/secure/attachment/12684140/Skewed%20Join%20Design%20Doc.pdf)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark skewed-join

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3505


commit 80d68a2ed9979fbfd84f99a3bc34beb51ff449cd
Author: zsxwing 
Date:   2014-11-25T02:57:31Z

Implement skewed join




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org