Here's a simple working version.
import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.HashMap; import java.util.Map; /** * Created by akhld on 11/11/14. */ public class KafkaWordcount { public static void main(String[] args) { // Location of the Spark directory String sparkHome = "/home/akhld/mobi/localcluster/spark-1"; // URL of the Spark cluster String sparkUrl = "spark://akhldz:7077"; // Location of the required JAR files String jarFiles = "/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar"; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("JavaKafkaWordCount"); sparkConf.setJars(new String[]{jarFiles}); sparkConf.setMaster(sparkUrl); sparkConf.setSparkHome(sparkHome); //These are the minimal things that are required *Map<String, Integer> topicMap = new HashMap<String, Integer>();* * topicMap.put("test", 1);* * String kafkaGroup = "groups";* * String zkQuorum = "localhost:2181";* // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, kafkaGroup, topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(x.split(" ")); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } } [image: Inline image 1] Thanks Best Regards On Tue, Nov 11, 2014 at 5:37 AM, Something Something < mailinglist...@gmail.com> wrote: > I am not running locally. The Spark master is: > > "spark://<machine name>:7077" > > > > On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> What is the Spark master that you are using. Use local[4], not local >> if you are running locally. >> >> On Mon, Nov 10, 2014 at 3:01 PM, Something Something >> <mailinglist...@gmail.com> wrote: >> > I am embarrassed to admit but I can't get a basic 'word count' to work >> under >> > Kafka/Spark streaming. My code looks like this. I don't see any word >> > counts in console output. Also, don't see any output in UI. Needless >> to >> > say, I am newbie in both 'Spark' as well as 'Kafka'. >> > >> > Please help. Thanks. >> > >> > Here's the code: >> > >> > public static void main(String[] args) { >> > if (args.length < 4) { >> > System.err.println("Usage: JavaKafkaWordCount <zkQuorum> >> <group> >> > <topics> <numThreads>"); >> > System.exit(1); >> > } >> > >> > // StreamingExamples.setStreamingLogLevels(); >> > // SparkConf sparkConf = new >> > SparkConf().setAppName("JavaKafkaWordCount"); >> > >> > // Location of the Spark directory >> > String sparkHome = "/opt/mapr/spark/spark-1.0.2/"; >> > >> > // URL of the Spark cluster >> > String sparkUrl = "spark://mymachine:7077"; >> > >> > // Location of the required JAR files >> > String jarFiles = >> > >> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar"; >> > >> > SparkConf sparkConf = new SparkConf(); >> > sparkConf.setAppName("JavaKafkaWordCount"); >> > sparkConf.setJars(new String[]{jarFiles}); >> > sparkConf.setMaster(sparkUrl); >> > sparkConf.set("spark.ui.port", "2348"); >> > sparkConf.setSparkHome(sparkHome); >> > >> > Map<String, String> kafkaParams = new HashMap<String, String>(); >> > kafkaParams.put("zookeeper.connect", "myedgenode:2181"); >> > kafkaParams.put("group.id", "1"); >> > kafkaParams.put("metadata.broker.list", "myedgenode:9092"); >> > kafkaParams.put("serializer.class", >> > "kafka.serializer.StringEncoder"); >> > kafkaParams.put("request.required.acks", "1"); >> > >> > // Create the context with a 1 second batch size >> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, >> new >> > Duration(2000)); >> > >> > int numThreads = Integer.parseInt(args[3]); >> > Map<String, Integer> topicMap = new HashMap<String, Integer>(); >> > String[] topics = args[2].split(","); >> > for (String topic: topics) { >> > topicMap.put(topic, numThreads); >> > } >> > >> > // JavaPairReceiverInputDStream<String, String> messages = >> > // KafkaUtils.createStream(jssc, args[0], args[1], >> topicMap); >> > JavaPairDStream<String, String> messages = >> > KafkaUtils.createStream(jssc, >> > String.class, >> > String.class, >> > StringDecoder.class, >> > StringDecoder.class, >> > kafkaParams, >> > topicMap, >> > StorageLevel.MEMORY_ONLY_SER()); >> > >> > >> > JavaDStream<String> lines = messages.map(new >> Function<Tuple2<String, >> > String>, String>() { >> > @Override >> > public String call(Tuple2<String, String> tuple2) { >> > return tuple2._2(); >> > } >> > }); >> > >> > JavaDStream<String> words = lines.flatMap(new >> > FlatMapFunction<String, String>() { >> > @Override >> > public Iterable<String> call(String x) { >> > return Lists.newArrayList(SPACE.split(x)); >> > } >> > }); >> > >> > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( >> > new PairFunction<String, String, Integer>() { >> > @Override >> > public Tuple2<String, Integer> call(String s) { >> > return new Tuple2<String, Integer>(s, 1); >> > } >> > }).reduceByKey(new Function2<Integer, Integer, >> Integer>() { >> > @Override >> > public Integer call(Integer i1, Integer i2) { >> > return i1 + i2; >> > } >> > }); >> > >> > wordCounts.print(); >> > jssc.start(); >> > jssc.awaitTermination(); >> > >> > >