Hi, https://github.com/databricks/spark-perf/tree/master/streaming-tests/src/main/scala/streaming/perf contains some performance tests for streaming. There are examples of how to generate synthetic files during the test in that repo, maybe you can find some code snippets that you can use there.
Best, Burak ----- Original Message ----- From: "Emre Sevinc" <emre.sev...@gmail.com> To: user@spark.apache.org Sent: Monday, December 8, 2014 2:36:41 AM Subject: How can I make Spark Streaming count the words in a file in a unit test? Hello, I've successfully built a very simple Spark Streaming application in Java that is based on the HdfsCount example in Scala at https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala . When I submit this application to my local Spark, it waits for a file to be written to a given directory, and when I create that file it successfully prints the number of words. I terminate the application by pressing Ctrl+C. Now I've tried to create a very basic unit test for this functionality, but in the test I was not able to print the same information, that is the number of words. What am I missing? Below is the unit test file, and after that I've also included the code snippet that shows the countWords method: ========================================================================= StarterAppTest.java ========================================================================= import com.google.common.io.Files; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.*; import java.io.*; public class StarterAppTest { JavaStreamingContext ssc; File tempDir; @Before public void setUp() { ssc = new JavaStreamingContext("local", "test", new Duration(3000)); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); } @After public void tearDown() { ssc.stop(); ssc = null; } @Test public void testInitialization() { Assert.assertNotNull(ssc.sc()); } @Test public void testCountWords() { StarterApp starterApp = new StarterApp(); try { JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()); JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines); System.err.println("===== Word Counts ======="); wordCounts.print(); System.err.println("===== Word Counts ======="); ssc.start(); File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt"); PrintWriter writer = new PrintWriter(tmpFile, "UTF-8"); writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin"); writer.close(); System.err.println("===== Word Counts ======="); wordCounts.print(); System.err.println("===== Word Counts ======="); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } Assert.assertTrue(true); } } ========================================================================= This test compiles and starts to run, Spark Streaming prints a lot of diagnostic messages on the console but the calls to wordCounts.print(); does not print anything, whereas in StarterApp.java itself, they do. I've also added ssc.awaitTermination(); after ssc.start() but nothing changed in that respect. After that I've also tried to create a new file in the directory that this Spark Streaming application was checking but this time it gave an error. For completeness, below is the wordCounts method: ================================================================================ public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey((i1, i2) -> i1 + i2); return wordCounts; } ================================================================================ Kind regards Emre Sevinç --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org