Repository: spark Updated Branches: refs/heads/master b14ede789 -> 181ec5030
http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 2c7ff87..ac451d1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -17,24 +17,25 @@ package org.apache.spark.streaming.api.java -import java.util.{List => JList} import java.lang.{Long => JLong} +import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} -import org.apache.spark.Partitioner +import com.google.common.base.Optional +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.conf.Configuration -import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD} -import org.apache.spark.storage.StorageLevel -import com.google.common.base.Optional +import org.apache.spark.Partitioner +import org.apache.spark.api.java.{JavaPairRDD, JavaUtils} +import org.apache.spark.api.java.JavaPairRDD._ +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream /** @@ -54,7 +55,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = - dstream.filter((x => f(x).booleanValue())) + dstream.filter((x => f.call(x).booleanValue())) /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaPairDStream[K, V] = dstream.cache() @@ -168,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner ): JavaPairDStream[K, C] = { - implicit val cm: ClassTag[C] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val cm: ClassTag[C] = fakeClassTag dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } @@ -184,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( partitioner: Partitioner, mapSideCombine: Boolean ): JavaPairDStream[K, C] = { - implicit val cm: ClassTag[C] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + implicit val cm: ClassTag[C] = fakeClassTag dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine) } @@ -279,7 +278,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration ):JavaPairDStream[K, V] = { @@ -299,7 +298,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, numPartitions: Int @@ -320,7 +319,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream. */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner @@ -345,8 +344,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration ): JavaPairDStream[K, V] = { @@ -374,8 +373,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * set this to null if you do not want to filter */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, numPartitions: Int, @@ -412,8 +411,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * set this to null if you do not want to filter */ def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - invReduceFunc: Function2[V, V, V], + reduceFunc: JFunction2[V, V, V], + invReduceFunc: JFunction2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner, @@ -453,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( */ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) : JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) } @@ -471,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } @@ -490,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { - implicit val cm: ClassTag[S] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] + implicit val cm: ClassTag[S] = fakeClassTag dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } @@ -501,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 'this' DStream without changing the key. */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { - implicit val cm: ClassTag[U] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] + implicit val cm: ClassTag[U] = fakeClassTag dstream.mapValues(f) } @@ -524,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * of partitions. */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -537,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, numPartitions) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -551,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, partitioner) .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } @@ -562,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream) } @@ -572,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream, numPartitions) } @@ -585,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag dstream.join(other.dstream, partitioner) } @@ -596,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * number of partitions. */ def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -611,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -625,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} } @@ -652,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], numPartitions: Int ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -667,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( other: JavaPairDStream[K, W], partitioner: Partitioner ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassTag[W] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + implicit val cm: ClassTag[W] = fakeClassTag val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} } @@ -748,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( new JavaDStream[(K, V)](dstream) } - override val classTag: ClassTag[(K, V)] = - implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = fakeClassTag } object JavaPairDStream { @@ -758,10 +741,8 @@ object JavaPairDStream { } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { - implicit val cmk: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val cmv: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + implicit val cmk: ClassTag[K] = fakeClassTag + implicit val cmv: ClassTag[V] = fakeClassTag new JavaPairDStream[K, V](dstream.dstream) } http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b082bb0..c48d754 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -187,7 +187,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { converter: JFunction[InputStream, java.lang.Iterable[T]], storageLevel: StorageLevel) : JavaDStream[T] = { - def fn = (x: InputStream) => converter.apply(x).toIterator + def fn = (x: InputStream) => converter.call(x).toIterator implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] ssc.socketStream(hostname, port, fn, storageLevel) @@ -431,7 +431,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * In the transform function, convert the JavaRDD corresponding to that JavaDStream to * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ - def transform[K, V]( + def transformToPair[K, V]( dstreams: JList[JavaDStream[_]], transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] ): JavaPairDStream[K, V] = { http://git-wip-us.apache.org/repos/asf/spark/blob/181ec503/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 54a0791..e93bf18 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - private class IntegerSum extends Function2<Integer, Integer, Integer> { + private class IntegerSum implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } - private class IntegerDifference extends Function2<Integer, Integer, Integer> { + private class IntegerDifference implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 - i2; @@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed3 = stream.transform( + JavaPairDStream<String, Integer> transformed3 = stream.transformToPair( new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { return null; @@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed4 = stream.transform( + JavaPairDStream<String, Integer> transformed4 = stream.transformToPair( new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { return null; @@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { return null; @@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair( new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { return null; @@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair( pairStream2, new Function3< JavaPairRDD<String, String>, @@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair( pairStream2, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<JavaDStream<?>> listOfDStreams2 = Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform( + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { @@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return new Tuple2<Integer, Integer>(i, i); } }; - return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } } ); @@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<Integer, String>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer,String> flatMapped = stream.flatMap( - new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); - } - return out; + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( + new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); } - }); + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = stream.map( + JavaPairDStream<String, Integer> pairStream = stream.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String in) throws Exception { @@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.map( + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { @@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions( + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { @@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { @@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, Integer> sorted = pairStream.transform( + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { @Override public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { @@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - class Converter extends Function<InputStream, Iterable<String>> { + + class Converter implements Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List<String> out = new ArrayList<String>();
