In the test application, I create a DStream by connect with a socket. 
Then I want to count the RDDs in the DStream which matches with another
reference RDD. 
Below is the Java code for my application. 

====== 
public class TestSparkStreaming { 

        public static void main(String[] args) { 
                // Function to make a pair of String 
                class StringToPair implements PairFunction<String, String,
String> { 
                        String value_; 
                        StringToPair(String value) { 
                                value_ = value; 
                        } 
                        @Override 
                        public Tuple2<String, String> call(String arg0)
throws Exception { 
                                return new Tuple2<String, String>(arg0,
value_); 
                        } 
                } 
                
                JavaStreamingContext jssc = new
JavaStreamingContext("local", "TestSparkStreaming", new Duration(1000)); 
                JavaReceiverInputDStream<String> networkevents =
jssc.socketTextStream("localhost", 9999); 
                
                // Pair input line with "world" 
                JavaPairDStream<String, String> streamEvents =
networkevents.mapToPair(new StringToPair("world")); 
                
            // Construct "hello" -> "spark" pair for input line to join with 
            JavaSparkContext sc = new JavaSparkContext(new SparkConf()); 
            List<String> list = Arrays.asList("hello"); 
            JavaRDD<String> reference = sc.parallelize(list); 
                final JavaPairRDD<String, String> referenceData =
reference.mapToPair(new StringToPair("spark")); 
                
                class MatchInputLine implements
PairFunction<Tuple2&lt;String, String>, String, Long> { 
                        @Override 
                        public Tuple2<String, Long> call( 
                                        Tuple2<String, String> t) throws
Exception { 
                                final String inputKey = t._1; 
                                final String inputValue = t._2; 
                                final List<String> ret =
referenceData.lookup(inputKey); 
                                return new Tuple2<String, Long>(inputKey,
new Long((ret != null) ? ret.size() : 0)); 
                        } 
                } 
                
                // Construct an output DStream if matched 
                JavaPairDStream<String, Long> joinedStream =
streamEvents.mapToPair(new MatchInputLine()); 
                
                // Count the output 
                class Count implements Function2<Long, Long, Long> { 
                        @Override 
                        public Long call(Long v1, Long v2) throws Exception
{ 
                                return v1 + v2; 
                        } 
                } 
                JavaPairDStream<String, Long> aggregatedJoinedStream =
joinedStream.reduceByKey(new Count()); 
                
                // Print the output 
                aggregatedJoinedStream.count().print(); 
                
                jssc.start();           
                jssc.awaitTermination(); 
        } 
} 
====== 

I'm testing on Windows in local mode (1.0.0). After I start the socket
server (the "nc" program mentioned in Spark's document) and submit the
packaged jar into Spark, I expect to see the output when I type "hello" in. 
However, I didn't see any output. I saw below message in the console where I
submit the jar. 

====== 
14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms 
14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with
curMem=0, maxMem=1235327385 
14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as
bytesto memory (size 12.0 B, free 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in
memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block
input-0-1403086668400 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[PEK-WKST68449/10.101.3.75:60769] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[PEK-WKST68449/10.101.3.75] 
14/06/18 18:17:48 INFO SendingConnection: Connected to
[PEK-WKST68449/10.101.3.75:60769], 1 messages pending 
14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already
existson this machine; not re-adding it 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[/127.0.0.1:60789] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[127.0.0.1/127.0.0.1] 
14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1
messages pending
14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400 
14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks 
14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms 
====== 

I see one "Waiting Batches" in Spark's monitoring UI. I'm not sure if that's
related with the problem. 

Can you suggest about the problem? I guess this is a basic question about
reduce function.
I will appreciate any help, thank you! 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7837.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to