Repository: spark Updated Branches: refs/heads/branch-1.3 b22d5b5f8 -> 5c63e0567
[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility. However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`. Author: zsxwing <zsxw...@gmail.com> Closes #3642 from zsxwing/SPARK-4795 and squashes the following commits: 914b2d6 [zsxwing] Add implicit back to the Writables methods 0b9017f [zsxwing] Add some docs a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795 39343de [zsxwing] Fix the unit test 64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext (cherry picked from commit d37978d8aafef8a2e637687f3848ca0a8b935b33) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c63e056 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c63e056 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c63e056 Branch: refs/heads/branch-1.3 Commit: 5c63e056746171a331b27336ebf3b8dbb23a0952 Parents: b22d5b5 Author: zsxwing <zsxw...@gmail.com> Authored: Tue Feb 3 20:17:12 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Tue Feb 3 20:17:18 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 70 +++++++++++++++++++- .../WriteInputFormatTestDataGenerator.scala | 1 - .../main/scala/org/apache/spark/package.scala | 3 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 22 +++--- .../spark/rdd/SequenceFileRDDFunctions.scala | 49 ++++++++++---- .../test/scala/org/apache/spark/FileSuite.scala | 1 - .../org/apache/sparktest/ImplicitSuite.scala | 14 +++- .../spark/graphx/lib/ShortestPathsSuite.scala | 2 +- 8 files changed, 129 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6a16a31..16c6fdb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1749,8 +1749,14 @@ object SparkContext extends Logging { @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]) = + rdd: RDD[(K, V)]) = { + val kf = implicitly[K => Writable] + val vf = implicitly[V => Writable] + // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it + implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf) + implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf) RDD.rddToSequenceFileRDDFunctions(rdd) + } @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") @@ -1767,20 +1773,35 @@ object SparkContext extends Logging { def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = RDD.numericRDDToDoubleRDDFunctions(rdd) - // Implicit conversions to common Writable types, for saveAsSequenceFile + // The following deprecated functions have already been moved to `object WritableFactory` to + // make the compiler find them automatically. They are still kept here for backward compatibility. + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob) + @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + + "kept here only for backward compatibility.", "1.3.0") implicit def stringToText(s: String): Text = new Text(s) private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]) @@ -2070,7 +2091,7 @@ object WritableConverter { new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } - // The following implicit functions were in SparkContext before 1.2 and users had to + // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward // compatibility and forward to the following functions directly. @@ -2103,3 +2124,46 @@ object WritableConverter { implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) } + +/** + * A class encapsulating how to convert some type T to Writable. It stores both the Writable class + * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. + * The Writable class will be used in `SequenceFileRDDFunctions`. + */ +private[spark] class WritableFactory[T]( + val writableClass: ClassTag[T] => Class[_ <: Writable], + val convert: T => Writable) extends Serializable + +object WritableFactory { + + private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W) + : WritableFactory[T] = { + val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]] + new WritableFactory[T](_ => writableClass, convert) + } + + implicit def intWritableFactory: WritableFactory[Int] = + simpleWritableFactory(new IntWritable(_)) + + implicit def longWritableFactory: WritableFactory[Long] = + simpleWritableFactory(new LongWritable(_)) + + implicit def floatWritableFactory: WritableFactory[Float] = + simpleWritableFactory(new FloatWritable(_)) + + implicit def doubleWritableFactory: WritableFactory[Double] = + simpleWritableFactory(new DoubleWritable(_)) + + implicit def booleanWritableFactory: WritableFactory[Boolean] = + simpleWritableFactory(new BooleanWritable(_)) + + implicit def bytesWritableFactory: WritableFactory[Array[Byte]] = + simpleWritableFactory(new BytesWritable(_)) + + implicit def stringWritableFactory: WritableFactory[String] = + simpleWritableFactory(new Text(_)) + + implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] = + simpleWritableFactory(w => w) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index c0cbd28..cf289fb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra * given directory (probably a temp directory) */ object WriteInputFormatTestDataGenerator { - import SparkContext._ def main(args: Array[String]) { val path = args(0) http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/main/scala/org/apache/spark/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 5ad73c3..b6249b4 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -27,8 +27,7 @@ package org.apache * contains operations available only on RDDs of Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can * be saved as SequenceFiles. These operations are automatically available on any RDD of the right - * type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to - * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work. + * type (e.g. RDD[(Int, Int)] through implicit conversions. * * Java programmers should reference the [[org.apache.spark.api.java]] package * for Spark programming APIs in Java. http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/main/scala/org/apache/spark/rdd/RDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 97aee58..fe55a51 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,11 +25,8 @@ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus -import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ @@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] - * through implicit conversions except `saveAsSequenceFile`. You need to - * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work. + * through implicit. * * Internally, each RDD is characterized by five main properties: * @@ -1527,7 +1523,7 @@ abstract class RDD[T: ClassTag]( */ object RDD { - // The following implicit functions were in SparkContext before 1.2 and users had to + // The following implicit functions were in SparkContext before 1.3 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find // them automatically. However, we still keep the old functions in SparkContext for backward // compatibility and forward to the following functions directly. @@ -1541,9 +1537,15 @@ object RDD { new AsyncRDDActions(rdd) } - implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = { - new SequenceFileRDDFunctions(rdd) + implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], + keyWritableFactory: WritableFactory[K], + valueWritableFactory: WritableFactory[V]) + : SequenceFileRDDFunctions[K, V] = { + implicit val keyConverter = keyWritableFactory.convert + implicit val valueConverter = valueWritableFactory.convert + new SequenceFileRDDFunctions(rdd, + keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt)) } implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)]) http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 2b48916..059f896 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -30,13 +30,35 @@ import org.apache.spark.Logging * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. * - * Import `org.apache.spark.SparkContext._` at the top of their program to use these functions. */ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag]( - self: RDD[(K, V)]) + self: RDD[(K, V)], + _keyWritableClass: Class[_ <: Writable], + _valueWritableClass: Class[_ <: Writable]) extends Logging with Serializable { + @deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0") + def this(self: RDD[(K, V)]) { + this(self, null, null) + } + + private val keyWritableClass = + if (_keyWritableClass == null) { + // pre 1.3.0, we need to use Reflection to get the Writable class + getWritableClass[K]() + } else { + _keyWritableClass + } + + private val valueWritableClass = + if (_valueWritableClass == null) { + // pre 1.3.0, we need to use Reflection to get the Writable class + getWritableClass[V]() + } else { + _valueWritableClass + } + private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = { val c = { if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) { @@ -55,6 +77,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag c.asInstanceOf[Class[_ <: Writable]] } + /** * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key * and value types. If the key or value are Writable, then we use their classes directly; @@ -65,26 +88,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) { def anyToWritable[U <% Writable](u: U): Writable = u - val keyClass = getWritableClass[K] - val valueClass = getWritableClass[V] - val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass) - val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass) + // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and + // valueWritableClass at the compile time. To implement that, we need to add type parameters to + // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a + // breaking change. + val convertKey = self.keyClass != keyWritableClass + val convertValue = self.valueClass != valueWritableClass - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + - valueClass.getSimpleName + ")" ) + logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," + + valueWritableClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec) + self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (!convertKey && convertValue) { self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile( - path, keyClass, valueClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && !convertValue) { self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile( - path, keyClass, valueClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && convertValue) { self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile( - path, keyClass, valueClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, jobConf, codec) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 5e24196..7acd27c 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} import org.scalatest.FunSuite -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD} import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala index 4918e2d..daa795a 100644 --- a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala @@ -44,13 +44,21 @@ class ImplicitSuite { } def testRddToSequenceFileRDDFunctions(): Unit = { - // TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions. - // That will be a breaking change. - import org.apache.spark.SparkContext.intToIntWritable val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD rdd.saveAsSequenceFile("/a/test/path") } + def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)] + = mockRDD + rdd.saveAsSequenceFile("/a/test/path") + } + + def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD + rdd.saveAsSequenceFile("/a/test/path") + } + def testRddToOrderedRDDFunctions(): Unit = { val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD rdd.sortByKey() http://git-wip-us.apache.org/repos/asf/spark/blob/5c63e056/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala index 265827b..f2c38e7 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -40,7 +40,7 @@ class ShortestPathsSuite extends FunSuite with LocalSparkContext { val graph = Graph.fromEdgeTuples(edges, 1) val landmarks = Seq(1, 4).map(_.toLong) val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { - case (v, spMap) => (v, spMap.mapValues(_.get)) + case (v, spMap) => (v, spMap.mapValues(i => i)) } assert(results.toSet === shortestPaths) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org