Repository: spark
Updated Branches:
  refs/heads/master b14ede789 -> 181ec5030


http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2c7ff87..ac451d1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,24 +17,25 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.util.{List => JList}
 import java.lang.{Long => JLong}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2, Function3 => JFunction3}
-import org.apache.spark.Partitioner
+import com.google.common.base.Optional
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
-import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
+import org.apache.spark.Partitioner
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.JavaPairRDD._
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => 
JFunction2}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 
 /**
@@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /** Return a new DStream containing only the elements that satisfy a 
predicate. */
   def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
-    dstream.filter((x => f(x).booleanValue()))
+    dstream.filter((x => f.call(x).booleanValue()))
 
   /** Persist RDDs of this DStream with the default storage level 
(MEMORY_ONLY_SER) */
   def cache(): JavaPairDStream[K, V] = dstream.cache()
@@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       mergeCombiners: JFunction2[C, C, C],
       partitioner: Partitioner
     ): JavaPairDStream[K, C] = {
-    implicit val cm: ClassTag[C] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val cm: ClassTag[C] = fakeClassTag
     dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, 
partitioner)
   }
 
@@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       partitioner: Partitioner,
       mapSideCombine: Boolean
     ): JavaPairDStream[K, C] = {
-    implicit val cm: ClassTag[C] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    implicit val cm: ClassTag[C] = fakeClassTag
     dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, 
partitioner, mapSideCombine)
   }
 
@@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       DStream's batching interval
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration
     ):JavaPairDStream[K, V] = {
@@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param numPartitions  Number of partitions of each RDD in the new DStream.
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
@@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                    DStream.
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
@@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       DStream's batching interval
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration
     ): JavaPairDStream[K, V] = {
@@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int,
@@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       set this to null if you do not want to filter
    */
   def reduceByKeyAndWindow(
-      reduceFunc: Function2[V, V, V],
-      invReduceFunc: Function2[V, V, V],
+      reduceFunc: JFunction2[V, V, V],
+      invReduceFunc: JFunction2[V, V, V],
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner,
@@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    */
   def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], 
Optional[S]])
   : JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
   }
 
@@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       numPartitions: Int)
   : JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), 
numPartitions)
   }
 
@@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
       partitioner: Partitioner
   ): JavaPairDStream[K, S] = {
-    implicit val cm: ClassTag[S] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
+    implicit val cm: ClassTag[S] = fakeClassTag
     dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), 
partitioner)
   }
 
@@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * 'this' DStream without changing the key.
    */
   def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
-    implicit val cm: ClassTag[U] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
+    implicit val cm: ClassTag[U] = fakeClassTag
     dstream.mapValues(f)
   }
 
@@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * of partitions.
    */
   def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], 
JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), 
seqAsJavaList((t._2))))
   }
 
@@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (JList[V], JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, numPartitions)
            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
@@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (JList[V], JList[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, partitioner)
            .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
   }
@@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Hash partitioning is used to generate the RDDs with Spark's default 
number of partitions.
    */
   def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream)
   }
 
@@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Hash partitioning is used to generate the RDDs with `numPartitions` 
partitions.
    */
   def join[W](other: JavaPairDStream[K, W], numPartitions: Int): 
JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream, numPartitions)
   }
 
@@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (V, W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     dstream.join(other.dstream, partitioner)
   }
 
@@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * number of partitions.
    */
   def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, 
Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (V, Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (V, Optional[W])] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
     joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
   }
@@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       numPartitions: Int
     ): JavaPairDStream[K, (Optional[V], W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
   }
@@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
     ): JavaPairDStream[K, (Optional[V], W)] = {
-    implicit val cm: ClassTag[W] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+    implicit val cm: ClassTag[W] = fakeClassTag
     val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
     joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
   }
@@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
     new JavaDStream[(K, V)](dstream)
   }
 
-  override val classTag: ClassTag[(K, V)] =
-    implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+  override val classTag: ClassTag[(K, V)] = fakeClassTag
 }
 
 object JavaPairDStream {
@@ -758,10 +741,8 @@ object JavaPairDStream {
   }
 
   def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, 
V] = {
-    implicit val cmk: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val cmv: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
+    implicit val cmk: ClassTag[K] = fakeClassTag
+    implicit val cmv: ClassTag[V] = fakeClassTag
     new JavaPairDStream[K, V](dstream.dstream)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b082bb0..c48d754 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
       converter: JFunction[InputStream, java.lang.Iterable[T]],
       storageLevel: StorageLevel)
   : JavaDStream[T] = {
-    def fn = (x: InputStream) => converter.apply(x).toIterator
+    def fn = (x: InputStream) => converter.call(x).toIterator
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.socketStream(hostname, port, fn, storageLevel)
@@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * In the transform function, convert the JavaRDD corresponding to that 
JavaDStream to
    * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
    */
-  def transform[K, V](
+  def transformToPair[K, V](
       dstreams: JList[JavaDStream[_]],
       transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]]
     ): JavaPairDStream[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 54a0791..e93bf18 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -247,14 +247,14 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     Assert.assertEquals(expected, result);
   }
 
-  private class IntegerSum extends Function2<Integer, Integer, Integer> {
+  private class IntegerSum implements Function2<Integer, Integer, Integer> {
     @Override
     public Integer call(Integer i1, Integer i2) throws Exception {
       return i1 + i2;
     }
   }
 
-  private class IntegerDifference extends Function2<Integer, Integer, Integer> 
{
+  private class IntegerDifference implements Function2<Integer, Integer, 
Integer> {
     @Override
     public Integer call(Integer i1, Integer i2) throws Exception {
       return i1 - i2;
@@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
       }
     );
 
-    JavaPairDStream<String, Integer> transformed3 = stream.transform(
+    JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
         new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
           @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in) throws Exception {
             return null;
@@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<String, Integer> transformed4 = stream.transform(
+    JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
         new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
           @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in, Time time) throws Exception {
             return null;
@@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+    JavaPairDStream<String, String> pairTransformed3 = 
pairStream.transformToPair(
         new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
String>>() {
           @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in) throws Exception {
             return null;
@@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+    JavaPairDStream<String, String> pairTransformed4 = 
pairStream.transformToPair(
         new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, 
String>>() {
           @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
             return null;
@@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         ssc, stringStringKVStream2, 1);
     JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
 
-    JavaPairDStream<String, Tuple2<String, String>> joined = 
pairStream1.transformWith(
+    JavaPairDStream<String, Tuple2<String, String>> joined = 
pairStream1.transformWithToPair(
         pairStream2,
         new Function3<
             JavaPairRDD<String, String>,
@@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+    JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
         stream2,
         new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
           @Override
@@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+    JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
         pairStream1,
         new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, 
JavaPairRDD<Double, Double>>() {
           @Override
@@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed3 = 
pairStream1.transformWith(
+    JavaPairDStream<Double, Double> pairTransformed3 = 
pairStream1.transformWithToPair(
         stream2,
         new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
           @Override
@@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         }
     );
 
-    JavaPairDStream<Double, Double> pairTransformed4 = 
pairStream1.transformWith(
+    JavaPairDStream<Double, Double> pairTransformed4 = 
pairStream1.transformWithToPair(
         pairStream2,
         new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, 
Character>, Time, JavaPairRDD<Double, Double>>() {
           @Override
@@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
     List<JavaDStream<?>> listOfDStreams2 =
         Arrays.<JavaDStream<?>>asList(stream1, stream2, 
pairStream1.toJavaDStream());
 
-    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = 
ssc.transform(
+    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = 
ssc.transformToPair(
       listOfDStreams2,
       new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, 
Tuple2<Integer, String>>>() {
         public JavaPairRDD<Integer, Tuple2<Integer, String>> 
call(List<JavaRDD<?>> listOfRDDs, Time time) {
@@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
               return new Tuple2<Integer, Integer>(i, i);
             }
           };
-          return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+          return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
         }
       }
     );
@@ -742,17 +742,17 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
             new Tuple2<Integer, String>(9, "s")));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
-        new PairFlatMapFunction<String, Integer, String>() {
-          @Override
-          public Iterable<Tuple2<Integer, String>> call(String in) throws 
Exception {
-            List<Tuple2<Integer, String>> out = Lists.newArrayList();
-            for (String letter: in.split("(?!^)")) {
-              out.add(new Tuple2<Integer, String>(in.length(), letter));
-            }
-            return out;
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+      new PairFlatMapFunction<String, Integer, String>() {
+        @Override
+        public Iterable<Tuple2<Integer, String>> call(String in) throws 
Exception {
+          List<Tuple2<Integer, String>> out = Lists.newArrayList();
+          for (String letter: in.split("(?!^)")) {
+            out.add(new Tuple2<Integer, String>(in.length(), letter));
           }
-        });
+          return out;
+        }
+      });
     JavaTestUtils.attachTestOutputStream(flatMapped);
     List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
 
@@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
         Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
 
     JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-    JavaPairDStream<String, Integer> pairStream = stream.map(
+    JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
         new PairFunction<String, String, Integer>() {
           @Override
           public Tuple2<String, Integer> call(String in) throws Exception {
@@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
 
     JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.map(
+    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
         new PairFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Tuple2<Integer, String> call(Tuple2<String, Integer> in) 
throws Exception {
@@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
 
     JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
         new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, 
String>() {
           @Override
           public Iterable<Tuple2<Integer, String>> 
call(Iterator<Tuple2<String, Integer>> in) throws Exception {
@@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream =
         JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
-    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
         new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
           @Override
           public Iterable<Tuple2<Integer, String>> call(Tuple2<String, 
Integer> in) throws Exception {
@@ -1228,7 +1228,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
     JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
     JavaPairDStream<String, Integer> reduceWindowed =
-        pairStream.reduceByKeyAndWindow(new IntegerSum(), new 
IntegerDifference(), new Duration(2000), new Duration(1000));
+        pairStream.reduceByKeyAndWindow(new IntegerSum(), new 
IntegerDifference(),
+          new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(reduceWindowed);
     List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
 
@@ -1300,7 +1301,7 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
         ssc, inputData, 1);
     JavaPairDStream<Integer, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
         new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, 
Integer>>() {
           @Override
           public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, 
Integer> in) throws Exception {
@@ -1632,7 +1633,8 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
 
   @Test
   public void testSocketString() {
-    class Converter extends Function<InputStream, Iterable<String>> {
+  
+    class Converter implements Function<InputStream, Iterable<String>> {
       public Iterable<String> call(InputStream in) throws IOException {
         BufferedReader reader = new BufferedReader(new InputStreamReader(in));
         List<String> out = new ArrayList<String>();

Reply via email to