I am not running locally. The Spark master is: "spark://<machine name>:7077"
On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > What is the Spark master that you are using. Use local[4], not local > if you are running locally. > > On Mon, Nov 10, 2014 at 3:01 PM, Something Something > <mailinglist...@gmail.com> wrote: > > I am embarrassed to admit but I can't get a basic 'word count' to work > under > > Kafka/Spark streaming. My code looks like this. I don't see any word > > counts in console output. Also, don't see any output in UI. Needless to > > say, I am newbie in both 'Spark' as well as 'Kafka'. > > > > Please help. Thanks. > > > > Here's the code: > > > > public static void main(String[] args) { > > if (args.length < 4) { > > System.err.println("Usage: JavaKafkaWordCount <zkQuorum> > <group> > > <topics> <numThreads>"); > > System.exit(1); > > } > > > > // StreamingExamples.setStreamingLogLevels(); > > // SparkConf sparkConf = new > > SparkConf().setAppName("JavaKafkaWordCount"); > > > > // Location of the Spark directory > > String sparkHome = "/opt/mapr/spark/spark-1.0.2/"; > > > > // URL of the Spark cluster > > String sparkUrl = "spark://mymachine:7077"; > > > > // Location of the required JAR files > > String jarFiles = > > > "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar"; > > > > SparkConf sparkConf = new SparkConf(); > > sparkConf.setAppName("JavaKafkaWordCount"); > > sparkConf.setJars(new String[]{jarFiles}); > > sparkConf.setMaster(sparkUrl); > > sparkConf.set("spark.ui.port", "2348"); > > sparkConf.setSparkHome(sparkHome); > > > > Map<String, String> kafkaParams = new HashMap<String, String>(); > > kafkaParams.put("zookeeper.connect", "myedgenode:2181"); > > kafkaParams.put("group.id", "1"); > > kafkaParams.put("metadata.broker.list", "myedgenode:9092"); > > kafkaParams.put("serializer.class", > > "kafka.serializer.StringEncoder"); > > kafkaParams.put("request.required.acks", "1"); > > > > // Create the context with a 1 second batch size > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, > new > > Duration(2000)); > > > > int numThreads = Integer.parseInt(args[3]); > > Map<String, Integer> topicMap = new HashMap<String, Integer>(); > > String[] topics = args[2].split(","); > > for (String topic: topics) { > > topicMap.put(topic, numThreads); > > } > > > > // JavaPairReceiverInputDStream<String, String> messages = > > // KafkaUtils.createStream(jssc, args[0], args[1], > topicMap); > > JavaPairDStream<String, String> messages = > > KafkaUtils.createStream(jssc, > > String.class, > > String.class, > > StringDecoder.class, > > StringDecoder.class, > > kafkaParams, > > topicMap, > > StorageLevel.MEMORY_ONLY_SER()); > > > > > > JavaDStream<String> lines = messages.map(new > Function<Tuple2<String, > > String>, String>() { > > @Override > > public String call(Tuple2<String, String> tuple2) { > > return tuple2._2(); > > } > > }); > > > > JavaDStream<String> words = lines.flatMap(new > > FlatMapFunction<String, String>() { > > @Override > > public Iterable<String> call(String x) { > > return Lists.newArrayList(SPACE.split(x)); > > } > > }); > > > > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > > new PairFunction<String, String, Integer>() { > > @Override > > public Tuple2<String, Integer> call(String s) { > > return new Tuple2<String, Integer>(s, 1); > > } > > }).reduceByKey(new Function2<Integer, Integer, > Integer>() { > > @Override > > public Integer call(Integer i1, Integer i2) { > > return i1 + i2; > > } > > }); > > > > wordCounts.print(); > > jssc.start(); > > jssc.awaitTermination(); > > >