[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing closed the pull request at: https://github.com/apache/spark/pull/3505 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-83306062 OK. Closing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-82703356 Let's close this pull request for now. I think we should revisit this topic in the future to implement sort-based join and aggregation for DataFrames and SQL. We can reuse some of the code from here then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-75148021 @rxin @zsxwing what is the status of this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22775200 --- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } +} + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } +} + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } +} + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } +} + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = +for ( + outerChunk <- outer.iterator; + innerChunk <- inn
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22774569 --- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } +} + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } +} + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } +} + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } +} + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = +for ( + outerChunk <- outer.iterator; + innerChunk <- inner.i
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22774231 --- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } +} + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } +} + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } +} + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } +} + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = +for ( + outerChunk <- outer.iterator; + innerChunk <- inn
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22774069 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -487,6 +487,196 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * ::Experimental:: + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, W))] = { --- End diff -- See the discuss above. I added `spark.join.skew=true` at first but removed it as per @rxin and @sryza 's suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22774042 --- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } +} + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } +} + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } +} + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } +} + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = +for ( + outerChunk <- outer.iterator; + innerChunk <- inner.i
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22773920 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -487,6 +487,196 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } /** + * ::Experimental:: + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + * + * It supports to join skewed data. If values of some key cannot be fit into memory, it will spill + * them to disk. + */ + @Experimental + def skewedJoin[W: ClassTag](other: RDD[(K, W)]): RDD[(K, (V, W))] = { --- End diff -- how about we just use previous join api. if we want to use skewedJoin, we can set spark.join.skew=true. Running application based on common join api can use skewedJoin without any changes.@zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r22460587 --- Diff: core/src/main/scala/org/apache/spark/rdd/SkewedJoinRDD.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{ObjectOutputStream, IOException} + +import scala.reflect.ClassTag + +import org.apache.spark._ +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection._ + +private[spark] sealed trait JoinType[K, L, R, PAIR <: Product2[_, _]] extends Serializable { + + def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]): Iterator[(K, PAIR)] + +} + +private[spark] object JoinType { + + def inner[K, L, R] = new JoinType[K, L, R, (L, R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, p2)) + } +} + } + } + + def leftOuter[K, L, R] = new JoinType[K, L, R, (L, Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (v, None)): (K, (L, Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (p1, Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (p1, Some(p2))) + } +} + } + } + + def rightOuter[K, L, R] = new JoinType[K, L, R, (Option[L], R)] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, v)): (K, (Option[L], R)) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), p2)) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), p2)) + } +} + } + } + + def fullOuter[K, L, R] = new JoinType[K, L, R, (Option[L], Option[R])] { + +override def flatten(i: Iterator[(K, (Iterable[Chunk[L]], Iterable[Chunk[R]]))]) = + i flatMap { +case (key, pair) => { + if (pair._1.size == 0) { +for (chunk <- pair._2.iterator; + v <- chunk +) yield (key, (None, Some(v))): (K, (Option[L], Option[R])) + } + else if (pair._2.size == 0) { +for (chunk <- pair._1.iterator; + v <- chunk +) yield (key, (Some(v), None)): (K, (Option[L], Option[R])) + } + else if (pair._1.size < pair._2.size) { +yieldPair(pair._1, pair._2, key, (p1: L, p2: R) => (Some(p1), Some(p2))) + } else { +yieldPair(pair._2, pair._1, key, (p2: R, p1: L) => (Some(p1), Some(p2))) + } +} + } + } + + private def yieldPair[K, OUT, IN, PAIR <: Product2[_, _]]( + outer: Iterable[Chunk[OUT]], inner: Iterable[Chunk[IN]], key: K, toPair: (OUT, IN) => PAIR) = +for ( + outerChunk <- outer.iterator; + innerChunk <- inn
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66603180 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24361/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66603178 [Test build #24361 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24361/consoleFull) for PR 3505 at commit [`af7eb71`](https://github.com/apache/spark/commit/af7eb714ab9916a628859682b8cbf9c4c2396029). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ChunkBuffer[T: ClassTag](parameters: ChunkParameters)` * `class ExternalOrderingAppendOnlyMap[K, V, C](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66594423 I removed my changes of the `join` methods. Now it only adds new `skewedJoin` methods, and users need to call them explicitly. > Other DSLs on top of Spark core like Pig, Hive, and Scalding. A great point I never thought. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66593703 [Test build #24361 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24361/consoleFull) for PR 3505 at commit [`af7eb71`](https://github.com/apache/spark/commit/af7eb714ab9916a628859682b8cbf9c4c2396029). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66564746 +1 to different method names vs. immutable confs. To throw in my two cents, I do think it's worthwhile to add this at a Spark core level. It probably doesn't make sense to add logic to Spark core's join transformation to figure out the optimal join strategy. But exposing different join transformations that implement different strategies is useful in a couple cases * Advanced users that deeply understand their access patterns and can decide which of these to use directly. * Other DSLs on top of Spark core like Pig, Hive, and Scalding. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66411889 Let me explain - Spark SQL is more than SQL. It is SQL + a dsl that will be improved over time. I personally believe over time, majority of Spark users will interact directly with SchemaRDD instead, because that is optimized for structured data. It is much easier in Spark SQL to also optimize for data structure. Personally I'm worried about pushing more and more stuff into Spark core itself because it is very hard to maintain and optimize for arbitrarily structured JVM objects. Most of the code you have written here can be used directly in SchemaRDD. If we really want to apply this in core itself, even without considering the maintenance burden, we will need to find a way so this can be turned on and off easily (e.g. different method names) rather than relying on immutable confs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66411639 > the problem is that sparkconf is immutable once created - so in order to toggle this on and off, a user would have to restart Spark. I added this configuration because it's convenient to old codes. I would encourage people to call `skewedJoin` directly for new codes if they need skewed join because they are usually familiar with their data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66411392 > Maybe a better place to do this is in SparkSQL? It depends on if this is a fundamental feature for Spark Core. IMO, I think it's better to have a skewed join in Spark Core as Join is a very common operator for Spark Core users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66410973 the problem is that sparkconf is immutable once created - so in order to toggle this on and off, a user would have to restart Spark. Maybe a better place to do this is in SparkSQL? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-66410552 ping @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/3505#discussion_r21147642 --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala --- @@ -481,9 +481,203 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { --- End diff -- A better signature of `join` is `def join[W: ClassTag](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]`. However, it's a breaking change, so I don't change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-64868013 [Test build #23932 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23932/consoleFull) for PR 3505 at commit [`80d68a2`](https://github.com/apache/spark/commit/80d68a2ed9979fbfd84f99a3bc34beb51ff449cd). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ChunkBuffer[T: ClassTag](parameters: ChunkParameters)` * `class ExternalOrderingAppendOnlyMap[K, V, C](` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-64868018 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23932/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3505#issuecomment-64861854 [Test build #23932 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/23932/consoleFull) for PR 3505 at commit [`80d68a2`](https://github.com/apache/spark/commit/80d68a2ed9979fbfd84f99a3bc34beb51ff449cd). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4644][Core] Implement skewed join
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/3505 [SPARK-4644][Core] Implement skewed join Skewed data is not rare. For example, a book recommendation site may have several books which are liked by most of the users. Running ALS on such skewed data will raise a OutOfMemory error, if some book has too many users which cannot be fit into memory. To solve it, we propose a skewed join implementation. [Design Doc](https://issues.apache.org/jira/secure/attachment/12684140/Skewed%20Join%20Design%20Doc.pdf) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark skewed-join Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3505.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3505 commit 80d68a2ed9979fbfd84f99a3bc34beb51ff449cd Author: zsxwing Date: 2014-11-25T02:57:31Z Implement skewed join --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org