Hi,

I am running streaning word count program in Spark Standalone mode cluster, 
having four machines in cluster.

public final class JavaKafkaStreamingWordCount {
        private static final Pattern SPACE = Pattern.compile(" ");
        static transient Configuration conf;
        private JavaKafkaStreamingWordCount() {
        }

        public static void main(String[] args) {
                if (args.length < 4) {
                        System.err.println("Usage: JavaKafkaWordCount 
<zkQuorum> <group> <topics> <numThreads>");
                        System.exit(1);
                }

                StreamingExamples.setStreamingLogLevels();
                SparkConf sparkConf = new 
SparkConf().setAppName("JavaKafkaWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
new Duration(10000));

                
jssc.checkpoint("hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint");

                int numThreads = Integer.parseInt(args[3]);
                Map<String, Integer> topicMap = new HashMap<String, Integer>();
                String[] topics = args[2].split("//,");
                for (String topic: topics) {
                        topicMap.put(topic, numThreads);
                }

                JavaPairReceiverInputDStream<String, String> messages =
                                KafkaUtils.createStream(jssc, args[0], args[1], 
topicMap);

                JavaDStream<String> lines = messages.map(new 
Function<Tuple2<String, String>, String>() {
                        @Override
                        public String call(Tuple2<String, String> tuple2) {
                                return tuple2._2();
                        }
                });

                JavaDStream<String> words = lines.flatMap(new 
FlatMapFunction<String, String>() {
                        @Override
                        public Iterable<String> call(String x) {
                                return Lists.newArrayList(SPACE.split(x));
                        }
                });

                JavaPairDStream<String, Integer> pairs = words.mapToPair(
                                new PairFunction<String, String, Integer>() {
                                        @OverrideĀ 
                                        public Tuple2<String, Integer> 
call(String s) {
                                                return new Tuple2<String, 
Integer>(s, 1);
                                        }
                                });

                Function2<List<Integer>, Optional<Integer>, Optional<Integer>> 
updateFunction =
                                new Function2<List<Integer>, Optional<Integer>, 
Optional<Integer>>() {
                        @Override public Optional<Integer> call(List<Integer> 
values, Optional<Integer> state) {
                                Integer newSum = 0;
                                if(state.isPresent()){
                                        if(values.size()!=0){
                                                newSum = state.get();
                                                for(int temp : values){
                                                        newSum += temp;
                                                }
                                        }else{
                                                newSum = state.get();
                                        }
                                }
                                else{
                                        if(values.size()!=0){
                                                for(int temp : values){
                                                        newSum += 1;
                                                }
                                        }
                                }

                                return Optional.of(newSum);
                        }
                };
                JavaPairDStream<String, Integer> runningCounts = 
pairs.updateStateByKey(updateFunction);
                conf = new Configuration();
                
runningCounts.saveAsNewAPIHadoopFiles("hdfs://172.17.199.229:8020/spark/wordCountOutput/word",
 "stream", Text.class, Text.class, (Class<? extends 
org.apache.hadoop.mapreduce.OutputFormat<?, ?>>)TextOutputFormat.class,conf);
                //jssc.sparkContext().hadoopConfiguration();
                jssc.start();
                jssc.awaitTermination();
        }
}

This is working fine in one node cluster but its giving following error when i 
try to run the same in cluster.

15/02/17 12:57:10 ERROR actor.OneForOneStrategy: 
org.apache.hadoop.conf.Configuration
java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1362)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1170)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
        at 
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:438)
        at 
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
        at 
org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259)
        at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167)
        at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/02/17 12:57:10 INFO scheduler.DAGScheduler: Missing parents: List()

what am I doing wrong in this.


thanks and regards
Shweta Jadhav

=====-----=====-----=====
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you


Reply via email to