
I've successfully built a very simple Spark Streaming application in Java
that is based on the HdfsCount example in Scala at

When I submit this application to my local Spark, it waits for a file to be
written to a given directory, and when I create that file it successfully
prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but
in the test I was not able to print the same information, that is the
number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code
snippet that shows the countWords method:

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();

  public void tearDown() {
    ssc = null;

  public void testInitialization() {

  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines =
      JavaPairDStream<String, Integer> wordCounts =

      System.err.println("===== Word Counts =======");
      System.err.println("===== Word Counts =======");


      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");

      System.err.println("===== Word Counts =======");
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
    } catch (UnsupportedEncodingException e) {




This test compiles and starts to run, Spark Streaming prints a lot of
diagnostic messages on the console but the calls to wordCounts.print();
does not print anything, whereas in StarterApp.java itself, they do.

I've also added ssc.awaitTermination(); after ssc.start() but nothing
changed in that respect. After that I've also tried to create a new file in
the directory that this Spark Streaming application was checking but this
time it gave an error.

For completeness, below is the wordCounts method:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String>
lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
      public Iterable<String> call(String x) { return
Lists.newArrayList(SPACE.split(x)); }

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              public Tuple2<String, Integer> call(String s) { return new
Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;

Kind regards
Emre Sevinç

Reply via email to