I am embarrassed to admit but I can't get a basic 'word count' to work under Kafka/Spark streaming. My code looks like this. I don't see any word counts in console output. Also, don't see any output in UI. Needless to say, I am newbie in both 'Spark' as well as 'Kafka'.
Please help. Thanks. Here's the code: public static void main(String[] args) { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } // StreamingExamples.setStreamingLogLevels(); // SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Location of the Spark directory String sparkHome = "/opt/mapr/spark/spark-1.0.2/"; // URL of the Spark cluster String sparkUrl = "spark://mymachine:7077"; // Location of the required JAR files String jarFiles = "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar"; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("JavaKafkaWordCount"); sparkConf.setJars(new String[]{jarFiles}); sparkConf.setMaster(sparkUrl); sparkConf.set("spark.ui.port", "2348"); sparkConf.setSparkHome(sparkHome); Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("zookeeper.connect", "myedgenode:2181"); kafkaParams.put("group.id", "1"); kafkaParams.put("metadata.broker.list", "myedgenode:9092"); kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); kafkaParams.put("request.required.acks", "1"); // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<String, Integer>(); String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } // JavaPairReceiverInputDStream<String, String> messages = // KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER()); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination();