Hi,
I am running streaning word count program in Spark Standalone mode cluster,
having four machines in cluster.
public final class JavaKafkaStreamingWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
static transient Configuration conf;
private JavaKafkaStreamingWordCount() {
}
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount
");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new
SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(1));
jssc.checkpoint("hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint");
int numThreads = Integer.parseInt(args[3]);
Map topicMap = new HashMap();
String[] topics = args[2].split("//,");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream messages =
KafkaUtils.createStream(jssc, args[0], args[1],
topicMap);
JavaDStream lines = messages.map(new
Function, String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
JavaDStream words = lines.flatMap(new
FlatMapFunction() {
@Override
public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream pairs = words.mapToPair(
new PairFunction() {
@OverrideĀ
public Tuple2
call(String s) {
return new Tuple2(s, 1);
}
});
Function2, Optional, Optional>
updateFunction =
new Function2, Optional,
Optional>() {
@Override public Optional call(List
values, Optional state) {
Integer newSum = 0;
if(state.isPresent()){
if(values.size()!=0){
newSum = state.get();
for(int temp : values){
newSum += temp;
}
}else{
newSum = state.get();
}
}
else{
if(values.size()!=0){
for(int temp : values){
newSum += 1;
}
}
}
return Optional.of(newSum);
}
};
JavaPairDStream runningCounts =
pairs.updateStateByKey(updateFunction);
conf = new Configuration();
runningCounts.saveAsNewAPIHadoopFiles("hdfs://172.17.199.229:8020/spark/wordCountOutput/word",
"stream", Text.class, Text.class, (Class>)TextOutputFormat.class,conf);
//jssc.sparkContext().hadoopConfiguration();
jssc.start();
jssc.awaitTermination();
}
}
This is working fine in one node cluster but its giving following error when i
try to run the same in cluster.
15/02/17 12:57:10 ERROR actor.OneForOneStrategy:
org.apache.hadoop.conf.Configuration
java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at
java.io.ObjectOutputStream.writeSerialData(Obj