Hi,

https://github.com/databricks/spark-perf/tree/master/streaming-tests/src/main/scala/streaming/perf
contains some performance tests for streaming. There are examples of how to 
generate synthetic files during the test in that repo, maybe you
can find some code snippets that you can use there.

Best,
Burak

----- Original Message -----
From: "Emre Sevinc" <emre.sev...@gmail.com>
To: user@spark.apache.org
Sent: Monday, December 8, 2014 2:36:41 AM
Subject: How can I make Spark Streaming count the words in a file in a unit 
test?

Hello,

I've successfully built a very simple Spark Streaming application in Java
that is based on the HdfsCount example in Scala at
https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
.

When I submit this application to my local Spark, it waits for a file to be
written to a given directory, and when I create that file it successfully
prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but
in the test I was not able to print the same information, that is the
number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code
snippet that shows the countWords method:

=========================================================================
StarterAppTest.java
=========================================================================
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines =
ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts =
starterApp.countWords(lines);

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}
=========================================================================

This test compiles and starts to run, Spark Streaming prints a lot of
diagnostic messages on the console but the calls to wordCounts.print();
does not print anything, whereas in StarterApp.java itself, they do.

I've also added ssc.awaitTermination(); after ssc.start() but nothing
changed in that respect. After that I've also tried to create a new file in
the directory that this Spark Streaming application was checking but this
time it gave an error.

For completeness, below is the wordCounts method:

================================================================================
public JavaPairDStream<String, Integer> countWords(JavaDStream<String>
lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
      @Override
      public Iterable<String> call(String x) { return
Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new
Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }
================================================================================



Kind regards
Emre Sevinç


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to