Repository: spark Updated Branches: refs/heads/branch-1.3 0165e9d13 -> fef2267cd
SPARK-5795 [STREAMING] api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended. CC tdas for review Author: Sean Owen <so...@cloudera.com> Closes #4608 from srowen/SPARK-5795 and squashes the following commits: 36f1ead [Sean Owen] Add code that shows compile problem and fix 036bd27 [Sean Owen] Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended. (cherry picked from commit 8e25373ce72061d3b6a353259ec627606afa4a5f) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fef2267c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fef2267c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fef2267c Branch: refs/heads/branch-1.3 Commit: fef2267cd4299de412a50b18cfd5e97ea7e7d851 Parents: 0165e9d Author: Sean Owen <so...@cloudera.com> Authored: Mon Feb 16 19:32:31 2015 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Feb 16 19:32:40 2015 +0000 ---------------------------------------------------------------------- .../streaming/api/java/JavaPairDStream.scala | 20 ++++++++++---------- .../apache/spark/streaming/JavaAPISuite.java | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fef2267c/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index de124cf..bd01789 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -726,7 +726,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { + def saveAsHadoopFiles(prefix: String, suffix: String) { dstream.saveAsHadoopFiles(prefix, suffix) } @@ -734,12 +734,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsHadoopFiles( + def saveAsHadoopFiles[F <: OutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + outputFormatClass: Class[F]) { dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } @@ -747,12 +747,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsHadoopFiles( + def saveAsHadoopFiles[F <: OutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], + outputFormatClass: Class[F], conf: JobConf) { dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } @@ -761,7 +761,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { + def saveAsNewAPIHadoopFiles(prefix: String, suffix: String) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix) } @@ -769,12 +769,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsNewAPIHadoopFiles( + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + outputFormatClass: Class[F]) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } @@ -782,12 +782,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ - def saveAsNewAPIHadoopFiles( + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + outputFormatClass: Class[F], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } http://git-wip-us.apache.org/repos/asf/spark/blob/fef2267c/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 2df8cf6..57302ff 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1828,4 +1828,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return expected; } + + // SPARK-5795: no logic assertions, just testing that intended API invocations compile + private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) { + pds.saveAsNewAPIHadoopFiles( + "", "", LongWritable.class, Text.class, + org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); + pds.saveAsHadoopFiles( + "", "", LongWritable.class, Text.class, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class); + // Checks that a previous common workaround for this API still compiles + pds.saveAsNewAPIHadoopFiles( + "", "", LongWritable.class, Text.class, + (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); + pds.saveAsHadoopFiles( + "", "", LongWritable.class, Text.class, + (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org