Dear all,

I have implemented a simple Spark streaming application to perform windowing
wordcount job. However, it seems that the latency is extremely high,
compared with running exactly the same job in Storm. The source code is
attached as follows:

public final class MyKafkaWordcountMain {

        public static void main(String[] args) throws Exception {
                String zkClient = "computer:2181";
                String kafkaGroup = "group1";
                String kafkaTopic = "topic1";
                int numThreads = 1;

                int batchDuration = Integer.valueOf(args[0]);
                int windowDuration = Integer.valueOf(args[1]);
                int slideDuration = Integer.valueOf(args[2]);

                SparkConf sparkConf = new 
SparkConf().setAppName("MyKafkaWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                                new Duration(batchDuration));
                jssc.checkpoint("file///temp/");

                Map<String, Integer> topicMap = new HashMap<String, Integer>();
                topicMap.put(kafkaTopic, numThreads);
                JavaPairReceiverInputDStream<String, String> messages = 
KafkaUtils
                                .createStream(jssc, zkClient, kafkaGroup, 
topicMap);

                JavaDStream<String> lines = messages
                                .map(new Function<Tuple2&lt;String, String>, 
String>() {
                                        @Override
                                        public String call(Tuple2<String, 
String> tuple2){
                                                return tuple2._2();
                                        }
                });

                //print the number of lines contained in each input RDD.
                lines.count().map(new Function<Long, Long>() {
                        private long totalCount = 0;
                        private long startTime = System.currentTimeMillis();

                        @Override
                        public Long call(Long in) throws Exception {
                                long endTime = System.currentTimeMillis();
                                String mystring = 
"=======input=======\nstartTime="
                                                + startTime
                                                + ", endTime="
                                                + endTime
                                                + ", elapsedTime="
                                                + (endTime-startTime)*1.0/1000
                                                + "sec, input count"
                                                + in
                                                + "\n======================\n";
                                System.out.println(mystring);
                                return in;
                        }
                }).print();

                JavaDStream<String> words = lines
                                .flatMap(new FlatMapFunction<String, String>() {

                                        @Override
                                        public Iterable<String> call(String x) 
throws Exception {
                                                // TODO Auto-generated method 
stub
                                                String[] words = x.split(" ");
//                                              for (String word : words) {
//                                                      word = 
word.replaceAll("[^A-Za-z0-9]", "")
//                                                                      
.toLowerCase();
//                                              }
                                                return 
Lists.newArrayList(words);
                                        }
                                });

                JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                                new PairFunction<String, String, Integer>(){
                                        @Override
                                        public Tuple2<String, Integer> 
call(String s){
                                                return new Tuple2<String, 
Integer>(s, 1);
                                        }
                                }).reduceByKeyAndWindow(new Function2<Integer, 
Integer, Integer>(){
                                        @Override
                                        public Integer call(Integer i1, Integer 
i2){
                                                return i1+i2;
                                        }
                                }, new Function2<Integer, Integer, Integer>(){
                                        @Override
                                        public Integer call(Integer i1, Integer 
i2){
                                                return i1-i2;
                                        }
                                },
                                new Duration(windowDuration), new 
Duration(slideDuration));
                wordCounts.checkpoint(new Duration(100000));

                wordCounts.count().map(new Function<Long, Long>() {
                        private long startTime = System.currentTimeMillis();

                        @Override
                        public Long call(Long in) throws Exception {
                                long endTime = System.currentTimeMillis();
                                String mystring = 
"=====output=======\nstartTime="
                                                + startTime
                                                + ", endTime="
                                                + endTime
                                                + ", elapsedTime="
                                                + (endTime-startTime)*1.0/1000
                                                + 
"sec\n======================\n";
                                System.out.println(mystring);
                                return in;
                        }
                }).print();
                jssc.start();
                jssc.awaitTermination();
        }
}

I am wondering is there anything wrong with my source code? Or is my method
to measure the latency problematic? Thanks.

Regards,
Yingjun




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/abnormal-latency-when-running-Spark-Streaming-tp7315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to