Repository: spark
Updated Branches:
  refs/heads/master 3871d94a6 -> 0e2405490


http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
new file mode 100644
index 0000000..8d24104
--- /dev/null
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -0,0 +1,2008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.streaming;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.JavaCheckpointTestUtils;
+import org.apache.spark.streaming.JavaTestUtils;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.StreamingContextState;
+import org.apache.spark.streaming.StreamingContextSuite;
+import org.apache.spark.streaming.Time;
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+import com.google.common.collect.Sets;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.util.Utils;
+
+// The test suite itself is Serializable so that anonymous Function 
implementations can be
+// serialized, as an alternative to converting these anonymous classes to 
static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite extends LocalJavaStreamingContext implements 
Serializable {
+
+  public static void equalIterator(Iterator<?> a, Iterator<?> b) {
+    while (a.hasNext() && b.hasNext()) {
+      Assert.assertEquals(a.next(), b.next());
+    }
+    Assert.assertEquals(a.hasNext(), b.hasNext());
+  }
+
+  public static void equalIterable(Iterable<?> a, Iterable<?> b) {
+      equalIterator(a.iterator(), b.iterator());
+  }
+
+  @Test
+  public void testInitialization() {
+    Assert.assertNotNull(ssc.sparkContext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContextState() {
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
+    Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaTestUtils.attachTestOutputStream(stream);
+    Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
+    ssc.start();
+    Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
+    ssc.stop();
+    Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCount() {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3,4),
+        Arrays.asList(3,4,5),
+        Arrays.asList(3));
+
+    List<List<Long>> expected = Arrays.asList(
+        Arrays.asList(4L),
+        Arrays.asList(3L),
+        Arrays.asList(1L));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Long> count = stream.count();
+    JavaTestUtils.attachTestOutputStream(count);
+    List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMap() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("hello", "world"),
+        Arrays.asList("goodnight", "moon"));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(5,5),
+        Arrays.asList(9,4));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(new Function<String, 
Integer>() {
+        @Override
+        public Integer call(String s) {
+          return s.length();
+        }
+    });
+    JavaTestUtils.attachTestOutputStream(letterCount);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWindow() {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6,1,2,3),
+        Arrays.asList(7,8,9,4,5,6),
+        Arrays.asList(7,8,9));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> windowed = stream.window(new Duration(2000));
+    JavaTestUtils.attachTestOutputStream(windowed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testWindowWithSlideDuration() {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9),
+        Arrays.asList(10,11,12),
+        Arrays.asList(13,14,15),
+        Arrays.asList(16,17,18));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(1,2,3,4,5,6),
+        Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
+        Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
+        Arrays.asList(13,14,15,16,17,18));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> windowed = stream.window(new Duration(4000), new 
Duration(2000));
+    JavaTestUtils.attachTestOutputStream(windowed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFilter() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("giants", "dodgers"),
+        Arrays.asList("yankees", "red sox"));
+
+    List<List<String>> expected = Arrays.asList(
+        Arrays.asList("giants"),
+        Arrays.asList("yankees"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<String> filtered = stream.filter(new Function<String, 
Boolean>() {
+      @Override
+      public Boolean call(String s) {
+        return s.contains("a");
+      }
+    });
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRepartitionMorePartitions() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+      Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    JavaDStream<Integer> stream =
+        JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+    JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> 
repartitioned =
+        stream.repartition(4);
+    JavaTestUtils.attachTestOutputStream(repartitioned);
+    List<List<List<Integer>>> result = 
JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+    Assert.assertEquals(2, result.size());
+    for (List<List<Integer>> rdd : result) {
+      Assert.assertEquals(4, rdd.size());
+      Assert.assertEquals(
+        10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + 
rdd.get(3).size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRepartitionFewerPartitions() {
+    List<List<Integer>> inputData = Arrays.asList(
+      Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+      Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    JavaDStream<Integer> stream =
+        JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+    JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> 
repartitioned =
+        stream.repartition(2);
+    JavaTestUtils.attachTestOutputStream(repartitioned);
+    List<List<List<Integer>>> result = 
JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+    Assert.assertEquals(2, result.size());
+    for (List<List<Integer>> rdd : result) {
+      Assert.assertEquals(2, rdd.size());
+      Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGlom() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("giants", "dodgers"),
+        Arrays.asList("yankees", "red sox"));
+
+    List<List<List<String>>> expected = Arrays.asList(
+        Arrays.asList(Arrays.asList("giants", "dodgers")),
+        Arrays.asList(Arrays.asList("yankees", "red sox")));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<List<String>> glommed = stream.glom();
+    JavaTestUtils.attachTestOutputStream(glommed);
+    List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMapPartitions() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("giants", "dodgers"),
+        Arrays.asList("yankees", "red sox"));
+
+    List<List<String>> expected = Arrays.asList(
+        Arrays.asList("GIANTSDODGERS"),
+        Arrays.asList("YANKEESRED SOX"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<String> mapped = stream.mapPartitions(
+        new FlatMapFunction<Iterator<String>, String>() {
+          @Override
+          public Iterator<String> call(Iterator<String> in) {
+            StringBuilder out = new StringBuilder();
+            while (in.hasNext()) {
+              out.append(in.next().toUpperCase(Locale.ENGLISH));
+            }
+            return Arrays.asList(out.toString()).iterator();
+          }
+        });
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  private static class IntegerSum implements Function2<Integer, Integer, 
Integer> {
+    @Override
+    public Integer call(Integer i1, Integer i2) {
+      return i1 + i2;
+    }
+  }
+
+  private static class IntegerDifference implements Function2<Integer, 
Integer, Integer> {
+    @Override
+    public Integer call(Integer i1, Integer i2) {
+      return i1 - i2;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduce() {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(6),
+        Arrays.asList(15),
+        Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceByWindowWithInverse() {
+    testReduceByWindow(true);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceByWindowWithoutInverse() {
+    testReduceByWindow(false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void testReduceByWindow(boolean withInverse) {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(6),
+        Arrays.asList(21),
+        Arrays.asList(39),
+        Arrays.asList(24));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> reducedWindowed;
+    if (withInverse) {
+      reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+                                              new IntegerDifference(),
+                                              new Duration(2000),
+                                              new Duration(1000));
+    } else {
+      reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+                                              new Duration(2000), new 
Duration(1000));
+    }
+    JavaTestUtils.attachTestOutputStream(reducedWindowed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testQueueStream() {
+    ssc.stop();
+    // Create a new JavaStreamingContext without checkpointing
+    SparkConf conf = new SparkConf()
+        .setMaster("local[2]")
+        .setAppName("test")
+        .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+    ssc = new JavaStreamingContext(conf, new Duration(1000));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9));
+
+    JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
+    JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
+    JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
+    JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
+
+    Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
+    rdds.add(rdd1);
+    rdds.add(rdd2);
+    rdds.add(rdd3);
+
+    JavaDStream<Integer> stream = ssc.queueStream(rdds);
+    JavaTestUtils.attachTestOutputStream(stream);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testTransform() {
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,2,3),
+        Arrays.asList(4,5,6),
+        Arrays.asList(7,8,9));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(3,4,5),
+        Arrays.asList(6,7,8),
+        Arrays.asList(9,10,11));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<Integer> transformed = stream.transform(
+      new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+        @Override
+        public JavaRDD<Integer> call(JavaRDD<Integer> in) {
+          return in.map(new Function<Integer, Integer>() {
+            @Override
+            public Integer call(Integer i) {
+              return i + 2;
+            }
+          });
+        }
+      });
+
+    JavaTestUtils.attachTestOutputStream(transformed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testVariousTransform() {
+    // tests whether all variations of transform can be called from Java
+
+    List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData =
+        Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+    stream.transform(
+        new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+          @Override
+          public JavaRDD<Integer> call(JavaRDD<Integer> in) {
+            return null;
+          }
+        }
+    );
+
+    stream.transform(
+      new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
+        @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) 
{
+          return null;
+        }
+      }
+    );
+
+    stream.transformToPair(
+        new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in) {
+            return null;
+          }
+        }
+    );
+
+    stream.transformToPair(
+        new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
+          @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> 
in, Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream.transform(
+        new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> 
in) {
+            return null;
+          }
+        }
+    );
+
+    pairStream.transform(
+        new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
+          @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> 
in, Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream.transformToPair(
+        new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
String>>() {
+          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in) {
+            return null;
+          }
+        }
+    );
+
+    pairStream.transformToPair(
+        new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, 
String>>() {
+          @Override public JavaPairRDD<String, String> 
call(JavaPairRDD<String, Integer> in,
+                                                            Time time) {
+            return null;
+          }
+        }
+    );
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testTransformWith() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california", "dodgers"),
+            new Tuple2<>("new york", "yankees")),
+        Arrays.asList(
+            new Tuple2<>("california", "sharks"),
+            new Tuple2<>("new york", "rangers")));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california", "giants"),
+            new Tuple2<>("new york", "mets")),
+        Arrays.asList(
+            new Tuple2<>("california", "ducks"),
+            new Tuple2<>("new york", "islanders")));
+
+
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = 
Arrays.asList(
+        Sets.newHashSet(
+            new Tuple2<>("california",
+                         new Tuple2<>("dodgers", "giants")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("yankees", "mets"))),
+        Sets.newHashSet(
+            new Tuple2<>("california",
+                         new Tuple2<>("sharks", "ducks")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("rangers", "islanders"))));
+
+    JavaDStream<Tuple2<String, String>> stream1 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<String, String>> joined = 
pairStream1.transformWithToPair(
+        pairStream2,
+        new Function3<
+            JavaPairRDD<String, String>,
+            JavaPairRDD<String, String>,
+            Time,
+            JavaPairRDD<String, Tuple2<String, String>>>() {
+          @Override
+          public JavaPairRDD<String, Tuple2<String, String>> call(
+              JavaPairRDD<String, String> rdd1,
+              JavaPairRDD<String, String> rdd2,
+              Time time) {
+            return rdd1.join(rdd2);
+          }
+        }
+    );
+
+    JavaTestUtils.attachTestOutputStream(joined);
+    List<List<Tuple2<String, Tuple2<String, String>>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
+    List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = 
new ArrayList<>();
+    for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
+      unorderedResult.add(Sets.newHashSet(res));
+    }
+
+    Assert.assertEquals(expected, unorderedResult);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testVariousTransformWith() {
+    // tests whether all variations of transformWith can be called from Java
+
+    List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+    List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, 
inputData1, 1);
+    JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, 
inputData2, 1);
+
+    List<List<Tuple2<String, Integer>>> pairInputData1 =
+        Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
+    List<List<Tuple2<Double, Character>>> pairInputData2 =
+        Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
+    JavaPairDStream<String, Integer> pairStream1 = 
JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+    JavaPairDStream<Double, Character> pairStream2 = 
JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+    stream1.transformWith(
+        stream2,
+        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> 
rdd2, Time time) {
+            return null;
+          }
+        }
+    );
+
+    stream1.transformWith(
+        pairStream1,
+        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, 
JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaRDD<Integer> rdd1, 
JavaPairRDD<String, Integer> rdd2,
+                                      Time time) {
+            return null;
+          }
+        }
+    );
+
+    stream1.transformWithToPair(
+        stream2,
+        new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, 
JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, 
JavaRDD<String> rdd2,
+                                                  Time time) {
+            return null;
+          }
+        }
+    );
+
+    stream1.transformWithToPair(
+        pairStream1,
+        new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time,
+          JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1,
+                                                  JavaPairRDD<String, Integer> 
rdd2,
+                                                  Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream1.transformWith(
+        stream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, 
JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, 
JavaRDD<String> rdd2,
+                                      Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream1.transformWith(
+        pairStream1,
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, 
Integer>, Time,
+          JavaRDD<Double>>() {
+          @Override
+          public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1,
+                                      JavaPairRDD<String, Integer> rdd2,
+                                      Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream1.transformWithToPair(
+        stream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time,
+          JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
+                                                  JavaRDD<String> rdd2,
+                                                  Time time) {
+            return null;
+          }
+        }
+    );
+
+    pairStream1.transformWithToPair(
+        pairStream2,
+        new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, 
Character>, Time,
+          JavaPairRDD<Double, Double>>() {
+          @Override
+          public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> 
rdd1,
+                                                  JavaPairRDD<Double, 
Character> rdd2,
+                                                  Time time) {
+            return null;
+          }
+        }
+    );
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testStreamingContextTransform(){
+    List<List<Integer>> stream1input = Arrays.asList(
+        Arrays.asList(1),
+        Arrays.asList(2)
+    );
+
+    List<List<Integer>> stream2input = Arrays.asList(
+        Arrays.asList(3),
+        Arrays.asList(4)
+    );
+
+    List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+        Arrays.asList(new Tuple2<>(1, "x")),
+        Arrays.asList(new Tuple2<>(2, "y"))
+    );
+
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = 
Arrays.asList(
+        Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+        Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
+    );
+
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, 
stream1input, 1);
+    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, 
stream2input, 1);
+    JavaPairDStream<Integer, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(
+        JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+    List<JavaDStream<?>> listOfDStreams1 = 
Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+    // This is just to test whether this transform to JavaStream compiles
+    ssc.transform(
+      listOfDStreams1,
+      new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
+        @Override
+        public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
+          Assert.assertEquals(2, listOfRDDs.size());
+          return null;
+        }
+      }
+    );
+
+    List<JavaDStream<?>> listOfDStreams2 =
+        Arrays.<JavaDStream<?>>asList(stream1, stream2, 
pairStream1.toJavaDStream());
+
+    JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = 
ssc.transformToPair(
+      listOfDStreams2,
+      new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, 
Tuple2<Integer, String>>>() {
+        @Override
+        public JavaPairRDD<Integer, Tuple2<Integer, String>> 
call(List<JavaRDD<?>> listOfRDDs,
+                                                                  Time time) {
+          Assert.assertEquals(3, listOfRDDs.size());
+          JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
+          JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
+          JavaRDD<Tuple2<Integer, String>> rdd3 =
+            (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
+          JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+          PairFunction<Integer, Integer, Integer> mapToTuple =
+            new PairFunction<Integer, Integer, Integer>() {
+            @Override
+            public Tuple2<Integer, Integer> call(Integer i) {
+              return new Tuple2<>(i, i);
+            }
+          };
+          return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+        }
+      }
+    );
+    JavaTestUtils.attachTestOutputStream(transformed2);
+    List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+      JavaTestUtils.runStreams(ssc, 2, 2);
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("go", "giants"),
+        Arrays.asList("boo", "dodgers"),
+        Arrays.asList("athletics"));
+
+    List<List<String>> expected = Arrays.asList(
+        Arrays.asList("g","o","g","i","a","n","t","s"),
+        Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
+        Arrays.asList("a","t","h","l","e","t","i","c","s"));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream<String> flatMapped = stream.flatMap(new 
FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split("(?!^)")).iterator();
+      }
+    });
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testForeachRDD() {
+    final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
+    final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
+    List<List<Integer>> inputData = Arrays.asList(
+        Arrays.asList(1,1,1),
+        Arrays.asList(1,1,1));
+
+    JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+    stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
+      @Override
+      public void call(JavaRDD<Integer> rdd) {
+        accumRdd.add(1);
+        rdd.foreach(new VoidFunction<Integer>() {
+          @Override
+          public void call(Integer i) {
+            accumEle.add(1);
+          }
+        });
+      }
+    });
+
+    // This is a test to make sure foreachRDD(VoidFunction2) can be called 
from Java
+    stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
+      @Override
+      public void call(JavaRDD<Integer> rdd, Time time) {
+      }
+    });
+
+    JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(2, accumRdd.value().intValue());
+    Assert.assertEquals(6, accumEle.value().intValue());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairFlatMap() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("giants"),
+        Arrays.asList("dodgers"),
+        Arrays.asList("athletics"));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(6, "g"),
+            new Tuple2<>(6, "i"),
+            new Tuple2<>(6, "a"),
+            new Tuple2<>(6, "n"),
+            new Tuple2<>(6, "t"),
+            new Tuple2<>(6, "s")),
+        Arrays.asList(
+            new Tuple2<>(7, "d"),
+            new Tuple2<>(7, "o"),
+            new Tuple2<>(7, "d"),
+            new Tuple2<>(7, "g"),
+            new Tuple2<>(7, "e"),
+            new Tuple2<>(7, "r"),
+            new Tuple2<>(7, "s")),
+        Arrays.asList(
+            new Tuple2<>(9, "a"),
+            new Tuple2<>(9, "t"),
+            new Tuple2<>(9, "h"),
+            new Tuple2<>(9, "l"),
+            new Tuple2<>(9, "e"),
+            new Tuple2<>(9, "t"),
+            new Tuple2<>(9, "i"),
+            new Tuple2<>(9, "c"),
+            new Tuple2<>(9, "s")));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+      new PairFlatMapFunction<String, Integer, String>() {
+        @Override
+        public Iterator<Tuple2<Integer, String>> call(String in) {
+          List<Tuple2<Integer, String>> out = new ArrayList<>();
+          for (String letter: in.split("(?!^)")) {
+            out.add(new Tuple2<>(in.length(), letter));
+          }
+          return out.iterator();
+        }
+      });
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testUnion() {
+    List<List<Integer>> inputData1 = Arrays.asList(
+        Arrays.asList(1,1),
+        Arrays.asList(2,2),
+        Arrays.asList(3,3));
+
+    List<List<Integer>> inputData2 = Arrays.asList(
+        Arrays.asList(4,4),
+        Arrays.asList(5,5),
+        Arrays.asList(6,6));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(1,1,4,4),
+        Arrays.asList(2,2,5,5),
+        Arrays.asList(3,3,6,6));
+
+    JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, 
inputData1, 2);
+    JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, 
inputData2, 2);
+
+    JavaDStream<Integer> unioned = stream1.union(stream2);
+    JavaTestUtils.attachTestOutputStream(unioned);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  /*
+   * Performs an order-invariant comparison of lists representing two RDD 
streams. This allows
+   * us to account for ordering variation within individual RDD's which occurs 
during windowing.
+   */
+  public static <T> void assertOrderInvariantEquals(
+      List<List<T>> expected, List<List<T>> actual) {
+    List<Set<T>> expectedSets = new ArrayList<>();
+    for (List<T> list: expected) {
+      expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+    }
+    List<Set<T>> actualSets = new ArrayList<>();
+    for (List<T> list: actual) {
+      actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+    }
+    Assert.assertEquals(expectedSets, actualSets);
+  }
+
+
+  // PairDStream Functions
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairFilter() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("giants", "dodgers"),
+        Arrays.asList("yankees", "red sox"));
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("giants", 6)),
+        Arrays.asList(new Tuple2<>("yankees", 7)));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
+        new PairFunction<String, String, Integer>() {
+          @Override
+          public Tuple2<String, Integer> call(String in) {
+            return new Tuple2<>(in, in.length());
+          }
+        });
+
+    JavaPairDStream<String, Integer> filtered = pairStream.filter(
+        new Function<Tuple2<String, Integer>, Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, Integer> in) {
+        return in._1().contains("a");
+      }
+    });
+    JavaTestUtils.attachTestOutputStream(filtered);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  private final List<List<Tuple2<String, String>>> stringStringKVStream = 
Arrays.asList(
+      Arrays.asList(new Tuple2<>("california", "dodgers"),
+                    new Tuple2<>("california", "giants"),
+                    new Tuple2<>("new york", "yankees"),
+                    new Tuple2<>("new york", "mets")),
+      Arrays.asList(new Tuple2<>("california", "sharks"),
+                    new Tuple2<>("california", "ducks"),
+                    new Tuple2<>("new york", "rangers"),
+                    new Tuple2<>("new york", "islanders")));
+
+  @SuppressWarnings("unchecked")
+  private final List<List<Tuple2<String, Integer>>> stringIntKVStream = 
Arrays.asList(
+      Arrays.asList(
+          new Tuple2<>("california", 1),
+          new Tuple2<>("california", 3),
+          new Tuple2<>("new york", 4),
+          new Tuple2<>("new york", 1)),
+      Arrays.asList(
+          new Tuple2<>("california", 5),
+          new Tuple2<>("california", 5),
+          new Tuple2<>("new york", 3),
+          new Tuple2<>("new york", 1)));
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairMap() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(1, "california"),
+            new Tuple2<>(3, "california"),
+            new Tuple2<>(4, "new york"),
+            new Tuple2<>(1, "new york")),
+        Arrays.asList(
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(3, "new york"),
+            new Tuple2<>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
+        new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+          @Override
+          public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
+            return in.swap();
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairMapPartitions() { // Maps pair -> pair of different type
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(1, "california"),
+            new Tuple2<>(3, "california"),
+            new Tuple2<>(4, "new york"),
+            new Tuple2<>(1, "new york")),
+        Arrays.asList(
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(5, "california"),
+            new Tuple2<>(3, "new york"),
+            new Tuple2<>(1, "new york")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
+        new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, 
String>() {
+          @Override
+          public Iterator<Tuple2<Integer, String>> 
call(Iterator<Tuple2<String, Integer>> in) {
+            List<Tuple2<Integer, String>> out = new LinkedList<>();
+            while (in.hasNext()) {
+              Tuple2<String, Integer> next = in.next();
+              out.add(next.swap());
+            }
+            return out.iterator();
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairMap2() { // Maps pair -> single
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Integer>> expected = Arrays.asList(
+            Arrays.asList(1, 3, 4, 1),
+            Arrays.asList(5, 5, 3, 1));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+    JavaDStream<Integer> reversed = pairStream.map(
+        new Function<Tuple2<String, Integer>, Integer>() {
+          @Override
+          public Integer call(Tuple2<String, Integer> in) {
+            return in._2();
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(reversed);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+    List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("hi", 1),
+            new Tuple2<>("ho", 2)),
+        Arrays.asList(
+            new Tuple2<>("hi", 1),
+            new Tuple2<>("ho", 2)));
+
+    List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(1, "h"),
+            new Tuple2<>(1, "i"),
+            new Tuple2<>(2, "h"),
+            new Tuple2<>(2, "o")),
+        Arrays.asList(
+            new Tuple2<>(1, "h"),
+            new Tuple2<>(1, "i"),
+            new Tuple2<>(2, "h"),
+            new Tuple2<>(2, "o")));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+        JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+    JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
+        new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
+          @Override
+          public Iterator<Tuple2<Integer, String>> call(Tuple2<String, 
Integer> in) {
+            List<Tuple2<Integer, String>> out = new LinkedList<>();
+            for (Character s : in._1().toCharArray()) {
+              out.add(new Tuple2<>(in._2(), s.toString()));
+            }
+            return out.iterator();
+          }
+        });
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairGroupByKey() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
+            new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
+        Arrays.asList(
+            new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
+            new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
+
+    JavaDStream<Tuple2<String, String>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Iterable<String>> grouped = 
pairStream.groupByKey();
+    JavaTestUtils.attachTestOutputStream(grouped);
+    List<List<Tuple2<String, Iterable<String>>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected.size(), result.size());
+    Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = 
result.iterator();
+    Iterator<List<Tuple2<String, List<String>>>> expectedItr = 
expected.iterator();
+    while (resultItr.hasNext() && expectedItr.hasNext()) {
+      Iterator<Tuple2<String, Iterable<String>>> resultElements = 
resultItr.next().iterator();
+      Iterator<Tuple2<String, List<String>>> expectedElements = 
expectedItr.next().iterator();
+      while (resultElements.hasNext() && expectedElements.hasNext()) {
+        Tuple2<String, Iterable<String>> resultElement = resultElements.next();
+        Tuple2<String, List<String>> expectedElement = expectedElements.next();
+        Assert.assertEquals(expectedElement._1(), resultElement._1());
+        equalIterable(expectedElement._2(), resultElement._2());
+      }
+      Assert.assertEquals(resultElements.hasNext(), 
expectedElements.hasNext());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairReduceByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california", 4),
+            new Tuple2<>("new york", 5)),
+        Arrays.asList(
+            new Tuple2<>("california", 10),
+            new Tuple2<>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new 
IntegerSum());
+
+    JavaTestUtils.attachTestOutputStream(reduced);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCombineByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california", 4),
+            new Tuple2<>("new york", 5)),
+        Arrays.asList(
+            new Tuple2<>("california", 10),
+            new Tuple2<>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
+        new Function<Integer, Integer>() {
+          @Override
+          public Integer call(Integer i) {
+            return i;
+          }
+        }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
+
+    JavaTestUtils.attachTestOutputStream(combined);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCountByValue() {
+    List<List<String>> inputData = Arrays.asList(
+      Arrays.asList("hello", "world"),
+      Arrays.asList("hello", "moon"),
+      Arrays.asList("hello"));
+
+    List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("world", 1L)),
+        Arrays.asList(
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("moon", 1L)),
+        Arrays.asList(
+            new Tuple2<>("hello", 1L)));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaPairDStream<String, Long> counted = stream.countByValue();
+    JavaTestUtils.attachTestOutputStream(counted);
+    List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 
3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGroupByKeyAndWindow() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+      Arrays.asList(
+        new Tuple2<>("california", Arrays.asList(1, 3)),
+        new Tuple2<>("new york", Arrays.asList(1, 4))
+      ),
+      Arrays.asList(
+        new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
+        new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
+      ),
+      Arrays.asList(
+        new Tuple2<>("california", Arrays.asList(5, 5)),
+        new Tuple2<>("new york", Arrays.asList(1, 3))
+      )
+    );
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Iterable<Integer>> groupWindowed =
+        pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(groupWindowed);
+    List<List<Tuple2<String, List<Integer>>>> result = 
JavaTestUtils.runStreams(ssc, 3, 3);
+
+    Assert.assertEquals(expected.size(), result.size());
+    for (int i = 0; i < result.size(); i++) {
+      Assert.assertEquals(convert(expected.get(i)), convert(result.get(i)));
+    }
+  }
+
+  private static Set<Tuple2<String, HashSet<Integer>>>
+    convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+    List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
+    for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+      newListOfTuples.add(convert(tuple));
+    }
+    return new HashSet<>(newListOfTuples);
+  }
+
+  private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, 
List<Integer>> tuple) {
+    return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceByKeyAndWindow() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 10),
+                      new Tuple2<>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+        pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), 
new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testUpdateStateByKey() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
+        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
+          @Override
+          public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
+            int out = 0;
+            if (state.isPresent()) {
+              out += state.get();
+            }
+            for (Integer v : values) {
+              out += v;
+            }
+            return Optional.of(out);
+          }
+        });
+    JavaTestUtils.attachTestOutputStream(updated);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testUpdateStateByKeyWithInitial() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<Tuple2<String, Integer>> initial = Arrays.asList(
+        new Tuple2<>("california", 1),
+            new Tuple2<>("new york", 2));
+
+    JavaRDD<Tuple2<String, Integer>> tmpRDD = 
ssc.sparkContext().parallelize(initial);
+    JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(tmpRDD);
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", 5),
+                      new Tuple2<>("new york", 7)),
+        Arrays.asList(new Tuple2<>("california", 15),
+                      new Tuple2<>("new york", 11)),
+        Arrays.asList(new Tuple2<>("california", 15),
+                      new Tuple2<>("new york", 11)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
+        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
+          @Override
+          public Optional<Integer> call(List<Integer> values, 
Optional<Integer> state) {
+            int out = 0;
+            if (state.isPresent()) {
+              out += state.get();
+            }
+            for (Integer v : values) {
+              out += v;
+            }
+            return Optional.of(out);
+          }
+        }, new HashPartitioner(1), initialRDD);
+    JavaTestUtils.attachTestOutputStream(updated);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceByKeyAndWindowWithInverse() {
+    List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+    List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", 4),
+                      new Tuple2<>("new york", 5)),
+        Arrays.asList(new Tuple2<>("california", 14),
+                      new Tuple2<>("new york", 9)),
+        Arrays.asList(new Tuple2<>("california", 10),
+                      new Tuple2<>("new york", 4)));
+
+    JavaDStream<Tuple2<String, Integer>> stream =
+      JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaPairDStream<String, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, Integer> reduceWindowed =
+        pairStream.reduceByKeyAndWindow(new IntegerSum(), new 
IntegerDifference(),
+                                        new Duration(2000), new 
Duration(1000));
+    JavaTestUtils.attachTestOutputStream(reduceWindowed);
+    List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 
3, 3);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCountByValueAndWindow() {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("hello", "world"),
+        Arrays.asList("hello", "moon"),
+        Arrays.asList("hello"));
+
+    List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
+        Sets.newHashSet(
+            new Tuple2<>("hello", 1L),
+            new Tuple2<>("world", 1L)),
+        Sets.newHashSet(
+            new Tuple2<>("hello", 2L),
+            new Tuple2<>("world", 1L),
+            new Tuple2<>("moon", 1L)),
+        Sets.newHashSet(
+            new Tuple2<>("hello", 2L),
+            new Tuple2<>("moon", 1L)));
+
+    JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<String, Long> counted =
+      stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
+    JavaTestUtils.attachTestOutputStream(counted);
+    List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 
3);
+    List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
+    for (List<Tuple2<String, Long>> res: result) {
+      unorderedResult.add(Sets.newHashSet(res));
+    }
+
+    Assert.assertEquals(expected, unorderedResult);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(3, 5),
+            new Tuple2<>(1, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(2, 5)),
+        Arrays.asList(
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(1, 5)));
+
+    List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(1, 5),
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5)),
+        Arrays.asList(
+            new Tuple2<>(1, 5),
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5)));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
+        new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, 
Integer>>() {
+          @Override
+          public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, 
Integer> in) {
+            return in.sortByKey();
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(sorted);
+    List<List<Tuple2<Integer, Integer>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPairToNormalRDDTransform() {
+    List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>(3, 5),
+            new Tuple2<>(1, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(2, 5)),
+        Arrays.asList(
+            new Tuple2<>(2, 5),
+            new Tuple2<>(3, 5),
+            new Tuple2<>(4, 5),
+            new Tuple2<>(1, 5)));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(3,1,4,2),
+        Arrays.asList(2,3,4,1));
+
+    JavaDStream<Tuple2<Integer, Integer>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<Integer, Integer> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaDStream<Integer> firstParts = pairStream.transform(
+        new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
+          @Override
+          public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
+            return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
+              @Override
+              public Integer call(Tuple2<Integer, Integer> in2) {
+                return in2._1();
+              }
+            });
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(firstParts);
+    List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "DODGERS"),
+                      new Tuple2<>("california", "GIANTS"),
+                      new Tuple2<>("new york", "YANKEES"),
+                      new Tuple2<>("new york", "METS")),
+        Arrays.asList(new Tuple2<>("california", "SHARKS"),
+                      new Tuple2<>("california", "DUCKS"),
+                      new Tuple2<>("new york", "RANGERS"),
+                      new Tuple2<>("new york", "ISLANDERS")));
+
+    JavaDStream<Tuple2<String, String>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+    JavaPairDStream<String, String> mapped = pairStream.mapValues(new 
Function<String, String>() {
+      @Override
+      public String call(String s) {
+        return s.toUpperCase(Locale.ENGLISH);
+      }
+    });
+
+    JavaTestUtils.attachTestOutputStream(mapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFlatMapValues() {
+    List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+    List<List<Tuple2<String, String>>> expected = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "dodgers1"),
+                      new Tuple2<>("california", "dodgers2"),
+                      new Tuple2<>("california", "giants1"),
+                      new Tuple2<>("california", "giants2"),
+                      new Tuple2<>("new york", "yankees1"),
+                      new Tuple2<>("new york", "yankees2"),
+                      new Tuple2<>("new york", "mets1"),
+                      new Tuple2<>("new york", "mets2")),
+        Arrays.asList(new Tuple2<>("california", "sharks1"),
+                      new Tuple2<>("california", "sharks2"),
+                      new Tuple2<>("california", "ducks1"),
+                      new Tuple2<>("california", "ducks2"),
+                      new Tuple2<>("new york", "rangers1"),
+                      new Tuple2<>("new york", "rangers2"),
+                      new Tuple2<>("new york", "islanders1"),
+                      new Tuple2<>("new york", "islanders2")));
+
+    JavaDStream<Tuple2<String, String>> stream = 
JavaTestUtils.attachTestInputStream(
+        ssc, inputData, 1);
+    JavaPairDStream<String, String> pairStream = 
JavaPairDStream.fromJavaDStream(stream);
+
+
+    JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
+        new Function<String, Iterable<String>>() {
+          @Override
+          public Iterable<String> call(String in) {
+            List<String> out = new ArrayList<>();
+            out.add(in + "1");
+            out.add(in + "2");
+            return out;
+          }
+        });
+
+    JavaTestUtils.attachTestOutputStream(flatMapped);
+    List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 
2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCoGroup() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks"),
+                      new Tuple2<>("new york", "rangers")));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "giants"),
+                      new Tuple2<>("new york", "mets")),
+        Arrays.asList(new Tuple2<>("california", "ducks"),
+                      new Tuple2<>("new york", "islanders")));
+
+
+    List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = 
Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california",
+                         new Tuple2<>(Arrays.asList("dodgers"), 
Arrays.asList("giants"))),
+            new Tuple2<>("new york",
+                         new Tuple2<>(Arrays.asList("yankees"), 
Arrays.asList("mets")))),
+        Arrays.asList(
+            new Tuple2<>("california",
+                         new Tuple2<>(Arrays.asList("sharks"), 
Arrays.asList("ducks"))),
+            new Tuple2<>("new york",
+                         new Tuple2<>(Arrays.asList("rangers"), 
Arrays.asList("islanders")))));
+
+
+    JavaDStream<Tuple2<String, String>> stream1 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> 
grouped =
+        pairStream1.cogroup(pairStream2);
+    JavaTestUtils.attachTestOutputStream(grouped);
+    List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
result =
+        JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected.size(), result.size());
+    Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> 
resultItr =
+        result.iterator();
+    Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> 
expectedItr =
+        expected.iterator();
+    while (resultItr.hasNext() && expectedItr.hasNext()) {
+      Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> 
resultElements =
+          resultItr.next().iterator();
+      Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> 
expectedElements =
+          expectedItr.next().iterator();
+      while (resultElements.hasNext() && expectedElements.hasNext()) {
+        Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> 
resultElement =
+            resultElements.next();
+        Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement =
+            expectedElements.next();
+        Assert.assertEquals(expectedElement._1(), resultElement._1());
+        equalIterable(expectedElement._2()._1(), resultElement._2()._1());
+        equalIterable(expectedElement._2()._2(), resultElement._2()._2());
+      }
+      Assert.assertEquals(resultElements.hasNext(), 
expectedElements.hasNext());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testJoin() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks"),
+                      new Tuple2<>("new york", "rangers")));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "giants"),
+                      new Tuple2<>("new york", "mets")),
+        Arrays.asList(new Tuple2<>("california", "ducks"),
+                      new Tuple2<>("new york", "islanders")));
+
+
+    List<List<Tuple2<String, Tuple2<String, String>>>> expected = 
Arrays.asList(
+        Arrays.asList(
+            new Tuple2<>("california",
+                         new Tuple2<>("dodgers", "giants")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("yankees", "mets"))),
+        Arrays.asList(
+            new Tuple2<>("california",
+                         new Tuple2<>("sharks", "ducks")),
+            new Tuple2<>("new york",
+                         new Tuple2<>("rangers", "islanders"))));
+
+
+    JavaDStream<Tuple2<String, String>> stream1 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<String, String>> joined = 
pairStream1.join(pairStream2);
+    JavaTestUtils.attachTestOutputStream(joined);
+    List<List<Tuple2<String, Tuple2<String, String>>>> result = 
JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testLeftOuterJoin() {
+    List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "dodgers"),
+                      new Tuple2<>("new york", "yankees")),
+        Arrays.asList(new Tuple2<>("california", "sharks") ));
+
+    List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+        Arrays.asList(new Tuple2<>("california", "giants") ),
+        Arrays.asList(new Tuple2<>("new york", "islanders") )
+
+    );
+
+    List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), 
Arrays.asList(1L));
+
+    JavaDStream<Tuple2<String, String>> stream1 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream1, 1);
+    JavaPairDStream<String, String> pairStream1 = 
JavaPairDStream.fromJavaDStream(stream1);
+
+    JavaDStream<Tuple2<String, String>> stream2 = 
JavaTestUtils.attachTestInputStream(
+        ssc, stringStringKVStream2, 1);
+    JavaPairDStream<String, String> pairStream2 = 
JavaPairDStream.fromJavaDStream(stream2);
+
+    JavaPairDStream<String, Tuple2<String, Optional<String>>> joined =
+        pairStream1.leftOuterJoin(pairStream2);
+    JavaDStream<Long> counted = joined.count();
+    JavaTestUtils.attachTestOutputStream(counted);
+    List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCheckpointMasterRecovery() throws InterruptedException {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("this", "is"),
+        Arrays.asList("a", "test"),
+        Arrays.asList("counting", "letters"));
+
+    List<List<Integer>> expectedInitial = Arrays.asList(
+        Arrays.asList(4,2));
+    List<List<Integer>> expectedFinal = Arrays.asList(
+        Arrays.asList(1,4),
+        Arrays.asList(8,7));
+
+    File tempDir = Files.createTempDir();
+    tempDir.deleteOnExit();
+    ssc.checkpoint(tempDir.getAbsolutePath());
+
+    JavaDStream<String> stream = 
JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+    JavaDStream<Integer> letterCount = stream.map(new Function<String, 
Integer>() {
+      @Override
+      public Integer call(String s) {
+        return s.length();
+      }
+    });
+    JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+    List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expectedInitial, initialResult);
+    Thread.sleep(1000);
+    ssc.stop();
+
+    ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
+    // Tweak to take into consideration that the last batch before failure
+    // will be re-processed after recovery
+    List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 
2, 3);
+    assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+    ssc.stop();
+    Utils.deleteRecursively(tempDir);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContextGetOrCreate() throws InterruptedException {
+    ssc.stop();
+
+    final SparkConf conf = new SparkConf()
+        .setMaster("local[2]")
+        .setAppName("test")
+        .set("newContext", "true");
+
+    File emptyDir = Files.createTempDir();
+    emptyDir.deleteOnExit();
+    StreamingContextSuite contextSuite = new StreamingContextSuite();
+    String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
+    String checkpointDir = contextSuite.createValidCheckpoint();
+
+    // Function to create JavaStreamingContext without any output operations
+    // (used to detect the new context)
+    final AtomicBoolean newContextCreated = new AtomicBoolean(false);
+    Function0<JavaStreamingContext> creatingFunc = new 
Function0<JavaStreamingContext>() {
+      @Override
+      public JavaStreamingContext call() {
+        newContextCreated.set(true);
+        return new JavaStreamingContext(conf, Seconds.apply(1));
+      }
+    };
+
+    newContextCreated.set(false);
+    ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), 
creatingFunc);
+    Assert.assertTrue("new context not created", newContextCreated.get());
+    ssc.stop();
+
+    newContextCreated.set(false);
+    ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, 
creatingFunc,
+        new Configuration(), true);
+    Assert.assertTrue("new context not created", newContextCreated.get());
+    ssc.stop();
+
+    newContextCreated.set(false);
+    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+        new Configuration());
+    Assert.assertTrue("old context not recovered", !newContextCreated.get());
+    ssc.stop();
+
+    newContextCreated.set(false);
+    JavaSparkContext sc = new JavaSparkContext(conf);
+    ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
+        new Configuration());
+    Assert.assertTrue("old context not recovered", !newContextCreated.get());
+    ssc.stop();
+  }
+
+  /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCheckpointofIndividualStream() throws InterruptedException {
+    List<List<String>> inputData = Arrays.asList(
+        Arrays.asList("this", "is"),
+        Arrays.asList("a", "test"),
+        Arrays.asList("counting", "letters"));
+
+    List<List<Integer>> expected = Arrays.asList(
+        Arrays.asList(4,2),
+        Arrays.asList(1,4),
+        Arrays.asList(8,7));
+
+    JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+    JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+      @Override
+      public Integer call(String s) {
+        return s.length();
+      }
+    });
+    JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+
+    letterCount.checkpoint(new Duration(1000));
+
+    List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 
3);
+    assertOrderInvariantEquals(expected, result1);
+  }
+  */
+
+  // Input stream tests. These mostly just test that we can instantiate a 
given InputStream with
+  // Java arguments and assign it to a JavaDStream without producing type 
errors. Testing of the
+  // InputStream functionality is deferred to the existing Scala tests.
+  @Test
+  public void testSocketTextStream() {
+    ssc.socketTextStream("localhost", 12345);
+  }
+
+  @Test
+  public void testSocketString() {
+    ssc.socketStream(
+      "localhost",
+      12345,
+      new Function<InputStream, Iterable<String>>() {
+        @Override
+        public Iterable<String> call(InputStream in) throws IOException {
+          List<String> out = new ArrayList<>();
+          try (BufferedReader reader = new BufferedReader(
+              new InputStreamReader(in, StandardCharsets.UTF_8))) {
+            for (String line; (line = reader.readLine()) != null;) {
+              out.add(line);
+            }
+          }
+          return out;
+        }
+      },
+      StorageLevel.MEMORY_ONLY());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testTextFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"spark");
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaDStream<String> input = ssc.textFileStream(testDir.toString());
+    JavaTestUtils.attachTestOutputStream(input);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"spark");
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
+      testDir.toString(),
+      LongWritable.class,
+      Text.class,
+      TextInputFormat.class,
+      new Function<Path, Boolean>() {
+        @Override
+        public Boolean call(Path v1) {
+          return Boolean.TRUE;
+        }
+      },
+      true);
+
+    JavaDStream<String> test = inputStream.map(
+      new Function<Tuple2<LongWritable, Text>, String>() {
+        @Override
+        public String call(Tuple2<LongWritable, Text> v1) {
+          return v1._2().toString();
+        }
+    });
+
+    JavaTestUtils.attachTestOutputStream(test);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testRawSocketStream() {
+    ssc.rawSocketStream("localhost", 12345);
+  }
+
+  private static List<List<String>> fileTestPrepare(File testDir) throws 
IOException {
+    File existingFile = new File(testDir, "0");
+    Files.write("0\n", existingFile, StandardCharsets.UTF_8);
+    Assert.assertTrue(existingFile.setLastModified(1000));
+    Assert.assertEquals(1000, existingFile.lastModified());
+    return Arrays.asList(Arrays.asList("0"));
+  }
+
+  @SuppressWarnings("unchecked")
+  // SPARK-5795: no logic assertions, just testing that intended API 
invocations compile
+  private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
+    pds.saveAsNewAPIHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+    pds.saveAsHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+    // Checks that a previous common workaround for this API still compiles
+    pds.saveAsNewAPIHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        (Class) 
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+    pds.saveAsHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to