Hello,

I've successfully built a very simple Spark Streaming application in Java
that is based on the HdfsCount example in Scala at
https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
.

When I submit this application to my local Spark, it waits for a file to be
written to a given directory, and when I create that file it successfully
prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but
in the test I was not able to print the same information, that is the
number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code
snippet that shows the countWords method:

=========================================================================
StarterAppTest.java
=========================================================================
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines =
ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts =
starterApp.countWords(lines);

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}
=========================================================================

This test compiles and starts to run, Spark Streaming prints a lot of
diagnostic messages on the console but the calls to wordCounts.print();
does not print anything, whereas in StarterApp.java itself, they do.

I've also added ssc.awaitTermination(); after ssc.start() but nothing
changed in that respect. After that I've also tried to create a new file in
the directory that this Spark Streaming application was checking but this
time it gave an error.

For completeness, below is the wordCounts method:

================================================================================
public JavaPairDStream<String, Integer> countWords(JavaDStream<String>
lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
      @Override
      public Iterable<String> call(String x) { return
Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new
Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }
================================================================================



Kind regards
Emre Sevinç

Reply via email to