Dear all, I have implemented a simple Spark streaming application to perform windowing wordcount job. However, it seems that the latency is extremely high, compared with running exactly the same job in Storm. The source code is attached as follows:
public final class MyKafkaWordcountMain { public static void main(String[] args) throws Exception { String zkClient = "computer:2181"; String kafkaGroup = "group1"; String kafkaTopic = "topic1"; int numThreads = 1; int batchDuration = Integer.valueOf(args[0]); int windowDuration = Integer.valueOf(args[1]); int slideDuration = Integer.valueOf(args[2]); SparkConf sparkConf = new SparkConf().setAppName("MyKafkaWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(batchDuration)); jssc.checkpoint("file///temp/"); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(kafkaTopic, numThreads); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils .createStream(jssc, zkClient, kafkaGroup, topicMap); JavaDStream<String> lines = messages .map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2){ return tuple2._2(); } }); //print the number of lines contained in each input RDD. lines.count().map(new Function<Long, Long>() { private long totalCount = 0; private long startTime = System.currentTimeMillis(); @Override public Long call(Long in) throws Exception { long endTime = System.currentTimeMillis(); String mystring = "=======input=======\nstartTime=" + startTime + ", endTime=" + endTime + ", elapsedTime=" + (endTime-startTime)*1.0/1000 + "sec, input count" + in + "\n======================\n"; System.out.println(mystring); return in; } }).print(); JavaDStream<String> words = lines .flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) throws Exception { // TODO Auto-generated method stub String[] words = x.split(" "); // for (String word : words) { // word = word.replaceAll("[^A-Za-z0-9]", "") // .toLowerCase(); // } return Lists.newArrayList(words); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>(){ @Override public Tuple2<String, Integer> call(String s){ return new Tuple2<String, Integer>(s, 1); } }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>(){ @Override public Integer call(Integer i1, Integer i2){ return i1+i2; } }, new Function2<Integer, Integer, Integer>(){ @Override public Integer call(Integer i1, Integer i2){ return i1-i2; } }, new Duration(windowDuration), new Duration(slideDuration)); wordCounts.checkpoint(new Duration(100000)); wordCounts.count().map(new Function<Long, Long>() { private long startTime = System.currentTimeMillis(); @Override public Long call(Long in) throws Exception { long endTime = System.currentTimeMillis(); String mystring = "=====output=======\nstartTime=" + startTime + ", endTime=" + endTime + ", elapsedTime=" + (endTime-startTime)*1.0/1000 + "sec\n======================\n"; System.out.println(mystring); return in; } }).print(); jssc.start(); jssc.awaitTermination(); } } I am wondering is there anything wrong with my source code? Or is my method to measure the latency problematic? Thanks. Regards, Yingjun -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/abnormal-latency-when-running-Spark-Streaming-tp7315.html Sent from the Apache Spark User List mailing list archive at Nabble.com.