In the test application, I create a DStream by connect with a socket. Then I want to count the RDDs in the DStream which matches with another reference RDD. Below is the Java code for my application.
====== public class TestSparkStreaming { public static void main(String[] args) { // Function to make a pair of String class StringToPair implements PairFunction<String, String, String> { String value_; StringToPair(String value) { value_ = value; } @Override public Tuple2<String, String> call(String arg0) throws Exception { return new Tuple2<String, String>(arg0, value_); } } JavaStreamingContext jssc = new JavaStreamingContext("local", "TestSparkStreaming", new Duration(1000)); JavaReceiverInputDStream<String> networkevents = jssc.socketTextStream("localhost", 9999); // Pair input line with "world" JavaPairDStream<String, String> streamEvents = networkevents.mapToPair(new StringToPair("world")); // Construct "hello" -> "spark" pair for input line to join with JavaSparkContext sc = new JavaSparkContext(new SparkConf()); List<String> list = Arrays.asList("hello"); JavaRDD<String> reference = sc.parallelize(list); final JavaPairRDD<String, String> referenceData = reference.mapToPair(new StringToPair("spark")); class MatchInputLine implements PairFunction<Tuple2<String, String>, String, Long> { @Override public Tuple2<String, Long> call( Tuple2<String, String> t) throws Exception { final String inputKey = t._1; final String inputValue = t._2; final List<String> ret = referenceData.lookup(inputKey); return new Tuple2<String, Long>(inputKey, new Long((ret != null) ? ret.size() : 0)); } } // Construct an output DStream if matched JavaPairDStream<String, Long> joinedStream = streamEvents.mapToPair(new MatchInputLine()); // Count the output class Count implements Function2<Long, Long, Long> { @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } } JavaPairDStream<String, Long> aggregatedJoinedStream = joinedStream.reduceByKey(new Count()); // Print the output aggregatedJoinedStream.count().print(); jssc.start(); jssc.awaitTermination(); } } ====== I'm testing on Windows in local mode (1.0.0). After I start the socket server (the "nc" program mentioned in Spark's document) and submit the packaged jar into Spark, I expect to see the output when I type "hello" in. However, I didn't see any output. I saw below message in the console where I submit the jar. ====== 14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms 14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with curMem=0, maxMem=1235327385 14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as bytesto memory (size 12.0 B, free 1178.1 MB) 14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB) 14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block input-0-1403086668400 14/06/18 18:17:48 INFO SendingConnection: Initiating connection to [PEK-WKST68449/10.101.3.75:60769] 14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from [PEK-WKST68449/10.101.3.75] 14/06/18 18:17:48 INFO SendingConnection: Connected to [PEK-WKST68449/10.101.3.75:60769], 1 messages pending 14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already existson this machine; not re-adding it 14/06/18 18:17:48 INFO SendingConnection: Initiating connection to [/127.0.0.1:60789] 14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from [127.0.0.1/127.0.0.1] 14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1 messages pending 14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400 14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks 14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms ====== I see one "Waiting Batches" in Spark's monitoring UI. I'm not sure if that's related with the problem. Can you suggest about the problem? I guess this is a basic question about reduce function. I will appreciate any help, thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7837.html Sent from the Apache Spark User List mailing list archive at Nabble.com.