Hi, I am running streaning word count program in Spark Standalone mode cluster, having four machines in cluster.
public final class JavaKafkaStreamingWordCount { private static final Pattern SPACE = Pattern.compile(" "); static transient Configuration conf; private JavaKafkaStreamingWordCount() { } public static void main(String[] args) { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); jssc.checkpoint("hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint"); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<String, Integer>(); String[] topics = args[2].split("//,"); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { @OverrideĀ public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = 0; if(state.isPresent()){ if(values.size()!=0){ newSum = state.get(); for(int temp : values){ newSum += temp; } }else{ newSum = state.get(); } } else{ if(values.size()!=0){ for(int temp : values){ newSum += 1; } } } return Optional.of(newSum); } }; JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); conf = new Configuration(); runningCounts.saveAsNewAPIHadoopFiles("hdfs://172.17.199.229:8020/spark/wordCountOutput/word", "stream", Text.class, Text.class, (Class<? extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>>)TextOutputFormat.class,conf); //jssc.sparkContext().hadoopConfiguration(); jssc.start(); jssc.awaitTermination(); } } This is working fine in one node cluster but its giving following error when i try to run the same in cluster. 15/02/17 12:57:10 ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1362) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1170) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/02/17 12:57:10 INFO scheduler.DAGScheduler: Missing parents: List() what am I doing wrong in this. thanks and regards Shweta Jadhav =====-----=====-----===== Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you