Repository: spark
Updated Branches:
  refs/heads/master 89562a172 -> 0eabea8a0


[SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency

ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.

Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).

By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.

Author: Matt Massie <mas...@cs.berkeley.edu>

Closes #7403 from massie/shuffle-classtags.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0eabea8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0eabea8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0eabea8a

Branch: refs/heads/master
Commit: 0eabea8a058ad60411c1384930ba12c1c638f5f1
Parents: 89562a1
Author: Matt Massie <mas...@cs.berkeley.edu>
Authored: Thu Sep 10 17:24:33 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Sep 10 17:24:33 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/bagel/Bagel.scala    |  2 +-
 .../scala/org/apache/spark/Dependency.scala     | 11 ++-
 .../org/apache/spark/api/java/JavaPairRDD.scala |  2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  5 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 90 +++++++++++++++++---
 .../org/apache/spark/rdd/ShuffledRDD.scala      |  4 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |  8 +-
 .../org/apache/spark/CheckpointSuite.scala      |  2 +-
 .../spark/shuffle/ShuffleDependencySuite.scala  | 67 +++++++++++++++
 9 files changed, 168 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala 
b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index ef0bb2a..4e6b768 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -78,7 +78,7 @@ object Bagel extends Logging {
       val startTime = System.currentTimeMillis
 
       val aggregated = agg(verts, aggregator)
-      val combinedMsgs = msgs.combineByKey(
+      val combinedMsgs = msgs.combineByKeyWithClassTag(
         combiner.createCombiner _, combiner.mergeMsg _, 
combiner.mergeCombiners _, partitioner)
       val grouped = combinedMsgs.groupWith(verts)
       val superstep_ = superstep  // Create a read-only copy of superstep for 
capture in closure

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index cfeeb39..9aafc9e 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
@@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends 
Dependency[T] {
  * @param mapSideCombine whether to perform partial aggregation (also known as 
map-side combine)
  */
 @DeveloperApi
-class ShuffleDependency[K, V, C](
+class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
     @transient private val _rdd: RDD[_ <: Product2[K, V]],
     val partitioner: Partitioner,
     val serializer: Option[Serializer] = None,
@@ -76,6 +78,13 @@ class ShuffleDependency[K, V, C](
 
   override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, 
V]]]
 
+  private[spark] val keyClassName: String = 
reflect.classTag[K].runtimeClass.getName
+  private[spark] val valueClassName: String = 
reflect.classTag[V].runtimeClass.getName
+  // Note: It's possible that the combiner class tag is null, if the 
combineByKey
+  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
+  private[spark] val combinerClassName: Option[String] =
+    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
+
   val shuffleId: Int = _rdd.context.newShuffleId()
 
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index fb78797..8344f63 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -239,7 +239,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
       mapSideCombine: Boolean,
       serializer: Serializer): JavaPairRDD[K, C] = {
       implicit val ctag: ClassTag[C] = fakeClassTag
-    fromRDD(rdd.combineByKey(
+    fromRDD(rdd.combineByKeyWithClassTag(
       createCombiner,
       mergeValue,
       mergeCombiners,

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 9c617fc..7bad749 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,6 +22,7 @@ import scala.language.existentials
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
@@ -74,7 +75,9 @@ private[spark] class CoGroupPartition(
  * @param part partitioner used to partition the shuffle output
  */
 @DeveloperApi
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], 
part: Partitioner)
+class CoGroupedRDD[K: ClassTag](
+    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
+    part: Partitioner)
   extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
 
   // For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, 
ArrayBuffer bs).

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4e5f2e8..c59f0d4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -57,7 +57,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   with SparkHadoopMapReduceUtil
   with Serializable
 {
+
   /**
+   * :: Experimental ::
    * Generic function to combine the elements for each key using a custom set 
of aggregation
    * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a 
"combined type" C
    * Note that V and C can be different -- for example, one might group an RDD 
of type
@@ -70,12 +72,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * In addition, users can control the partitioning of the output RDD, and 
whether to perform
    * map-side aggregation (if a mapper can produce multiple items with the 
same key).
    */
-  def combineByKey[C](createCombiner: V => C,
+  @Experimental
+  def combineByKeyWithClassTag[C](
+      createCombiner: V => C,
       mergeValue: (C, V) => C,
       mergeCombiners: (C, C) => C,
       partitioner: Partitioner,
       mapSideCombine: Boolean = true,
-      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
+      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = 
self.withScope {
     require(mergeCombiners != null, "mergeCombiners must be defined") // 
required as of Spark 0.9.0
     if (keyClass.isArray) {
       if (mapSideCombine) {
@@ -103,13 +107,50 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Simplified version of combineByKey that hash-partitions the output RDD.
+   * Generic function to combine the elements for each key using a custom set 
of aggregation
+   * functions. This method is here for backward compatibility. It does not 
provide combiner
+   * classtag information to the shuffle.
+   *
+   * @see [[combineByKeyWithClassTag]]
    */
-  def combineByKey[C](createCombiner: V => C,
+  def combineByKey[C](
+      createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true,
+      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
+    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
+      partitioner, mapSideCombine, serializer)(null)
+  }
+
+  /**
+   * Simplified version of combineByKeyWithClassTag that hash-partitions the 
output RDD.
+   * This method is here for backward compatibility. It does not provide 
combiner
+   * classtag information to the shuffle.
+   *
+   * @see [[combineByKeyWithClassTag]]
+   */
+  def combineByKey[C](
+      createCombiner: V => C,
       mergeValue: (C, V) => C,
       mergeCombiners: (C, C) => C,
       numPartitions: Int): RDD[(K, C)] = self.withScope {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, new 
HashPartitioner(numPartitions))
+    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, 
numPartitions)(null)
+  }
+
+  /**
+   * :: Experimental ::
+   * Simplified version of combineByKeyWithClassTag that hash-partitions the 
output RDD.
+   */
+  @Experimental
+  def combineByKeyWithClassTag[C](
+      createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = 
self.withScope {
+    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
+      new HashPartitioner(numPartitions))
   }
 
   /**
@@ -133,7 +174,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
     // We will clean the combiner closure later in `combineByKey`
     val cleanedSeqOp = self.context.clean(seqOp)
-    combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, 
combOp, partitioner)
+    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
+      cleanedSeqOp, combOp, partitioner)
   }
 
   /**
@@ -182,7 +224,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val createZero = () => 
cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
 
     val cleanedFunc = self.context.clean(func)
-    combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, 
cleanedFunc, partitioner)
+    combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
+      cleanedFunc, cleanedFunc, partitioner)
   }
 
   /**
@@ -268,7 +311,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
    * "combiner" in MapReduce.
    */
   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = 
self.withScope {
-    combineByKey[V]((v: V) => v, func, func, partitioner)
+    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
   }
 
   /**
@@ -392,7 +435,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       h1
     }
 
-    combineByKey(createHLL, mergeValueHLL, mergeHLL, 
partitioner).mapValues(_.cardinality())
+    combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
+      .mapValues(_.cardinality())
   }
 
   /**
@@ -466,7 +510,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val createCombiner = (v: V) => CompactBuffer(v)
     val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
     val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 
++= c2
-    val bufs = combineByKey[CompactBuffer[V]](
+    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine 
= false)
     bufs.asInstanceOf[RDD[(K, Iterable[V])]]
   }
@@ -565,12 +609,30 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   }
 
   /**
-   * Simplified version of combineByKey that hash-partitions the resulting RDD 
using the
+   * Simplified version of combineByKeyWithClassTag that hash-partitions the 
resulting RDD using the
+   * existing partitioner/parallelism level. This method is here for backward 
compatibility. It
+   * does not provide combiner classtag information to the shuffle.
+   *
+   * @see [[combineByKeyWithClassTag]]
+   */
+  def combineByKey[C](
+      createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
+    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
+  }
+
+  /**
+   * :: Experimental ::
+   * Simplified version of combineByKeyWithClassTag that hash-partitions the 
resulting RDD using the
    * existing partitioner/parallelism level.
    */
-  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
mergeCombiners: (C, C) => C)
-    : RDD[(K, C)] = self.withScope {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, 
defaultPartitioner(self))
+  @Experimental
+  def combineByKeyWithClassTag[C](
+      createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = 
self.withScope {
+    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, 
defaultPartitioner(self))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 2dc47f9..cb15d91 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.rdd
 
+import scala.reflect.ClassTag
+
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.serializer.Serializer
@@ -37,7 +39,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) 
extends Partition {
  */
 // TODO: Make this return RDD[Product2[K, C]] or have some way to configure 
mutable pairs
 @DeveloperApi
-class ShuffledRDD[K, V, C](
+class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
     @transient var prev: RDD[_ <: Product2[K, V]],
     part: Partitioner)
   extends RDD[(K, C)](prev.context, Nil) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 9a4fa30..25ec685 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -63,15 +63,17 @@ private[spark] class SubtractedRDD[K: ClassTag, V: 
ClassTag, W: ClassTag](
   }
 
   override def getDependencies: Seq[Dependency[_]] = {
-    Seq(rdd1, rdd2).map { rdd =>
+    def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, 
T2]])
+      : Dependency[_] = {
       if (rdd.partitioner == Some(part)) {
         logDebug("Adding one-to-one dependency with " + rdd)
         new OneToOneDependency(rdd)
       } else {
         logDebug("Adding shuffle dependency with " + rdd)
-        new ShuffleDependency(rdd, part, serializer)
+        new ShuffleDependency[T1, T2, Any](rdd, part, serializer)
       }
     }
+    Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
   }
 
   override def getPartitions: Array[Partition] = {
@@ -105,7 +107,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: 
ClassTag, W: ClassTag](
         seq
       }
     }
-    def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
+    def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
       dependencies(depNum) match {
         case oneToOneDependency: OneToOneDependency[_] =>
           val dependencyPartition = partition.narrowDeps(depNum).get.split

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala 
b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d343bb9..4d70bfe 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -483,7 +483,7 @@ class FatPairRDD(parent: RDD[Int], _partitioner: 
Partitioner) extends RDD[(Int,
 object CheckpointSuite {
   // This is a custom cogroup function that does not use mapValues like
   // the PairRDDFunctions.cogroup()
-  def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
+  def cogroup[K: ClassTag, V: ClassTag](first: RDD[(K, V)], second: RDD[(K, 
V)], part: Partitioner)
     : RDD[(K, Array[Iterable[V]])] = {
     new CoGroupedRDD[K](
       Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),

http://git-wip-us.apache.org/repos/asf/spark/blob/0eabea8a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala
new file mode 100644
index 0000000..4d5f599
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle
+
+import org.apache.spark._
+
+case class KeyClass()
+
+case class ValueClass()
+
+case class CombinerClass()
+
+class ShuffleDependencySuite extends SparkFunSuite with LocalSparkContext {
+
+  val conf = new SparkConf(loadDefaults = false)
+
+  test("key, value, and combiner classes correct in shuffle dependency without 
aggregation") {
+    sc = new SparkContext("local", "test", conf.clone())
+    val rdd = sc.parallelize(1 to 5, 4)
+      .map(key => (KeyClass(), ValueClass()))
+      .groupByKey()
+    val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+    assert(!dep.mapSideCombine, "Test requires that no map-side aggregator is 
defined")
+    assert(dep.keyClassName == classOf[KeyClass].getName)
+    assert(dep.valueClassName == classOf[ValueClass].getName)
+  }
+
+  test("key, value, and combiner classes available in shuffle dependency with 
aggregation") {
+    sc = new SparkContext("local", "test", conf.clone())
+    val rdd = sc.parallelize(1 to 5, 4)
+      .map(key => (KeyClass(), ValueClass()))
+      .aggregateByKey(CombinerClass())({ case (a, b) => a }, { case (a, b) => 
a })
+    val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+    assert(dep.mapSideCombine && dep.aggregator.isDefined, "Test requires 
map-side aggregation")
+    assert(dep.keyClassName == classOf[KeyClass].getName)
+    assert(dep.valueClassName == classOf[ValueClass].getName)
+    assert(dep.combinerClassName == Some(classOf[CombinerClass].getName))
+  }
+
+  test("combineByKey null combiner class tag handled correctly") {
+    sc = new SparkContext("local", "test", conf.clone())
+    val rdd = sc.parallelize(1 to 5, 4)
+      .map(key => (KeyClass(), ValueClass()))
+      .combineByKey((v: ValueClass) => v,
+        (c: AnyRef, v: ValueClass) => c,
+        (c1: AnyRef, c2: AnyRef) => c1)
+    val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+    assert(dep.keyClassName == classOf[KeyClass].getName)
+    assert(dep.valueClassName == classOf[ValueClass].getName)
+    assert(dep.combinerClassName == None)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to