ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration

2015-02-17 Thread Jadhav Shweta
Hi,

I am running streaning word count program in Spark Standalone mode cluster, 
having four machines in cluster.

public final class JavaKafkaStreamingWordCount {
private static final Pattern SPACE = Pattern.compile( );
static transient Configuration conf;
private JavaKafkaStreamingWordCount() {
}

public static void main(String[] args) {
if (args.length  4) {
System.err.println(Usage: JavaKafkaWordCount 
zkQuorum group topics numThreads);
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new 
SparkConf().setAppName(JavaKafkaWordCount);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
new Duration(1));


jssc.checkpoint(hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint);

int numThreads = Integer.parseInt(args[3]);
MapString, Integer topicMap = new HashMapString, Integer();
String[] topics = args[2].split(//,);
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

JavaPairReceiverInputDStreamString, String messages =
KafkaUtils.createStream(jssc, args[0], args[1], 
topicMap);

JavaDStreamString lines = messages.map(new 
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

JavaDStreamString words = lines.flatMap(new 
FlatMapFunctionString, String() {
@Override
public IterableString call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});

JavaPairDStreamString, Integer pairs = words.mapToPair(
new PairFunctionString, String, Integer() {
@OverrideĀ 
public Tuple2String, Integer 
call(String s) {
return new Tuple2String, 
Integer(s, 1);
}
});

Function2ListInteger, OptionalInteger, OptionalInteger 
updateFunction =
new Function2ListInteger, OptionalInteger, 
OptionalInteger() {
@Override public OptionalInteger call(ListInteger 
values, OptionalInteger state) {
Integer newSum = 0;
if(state.isPresent()){
if(values.size()!=0){
newSum = state.get();
for(int temp : values){
newSum += temp;
}
}else{
newSum = state.get();
}
}
else{
if(values.size()!=0){
for(int temp : values){
newSum += 1;
}
}
}

return Optional.of(newSum);
}
};
JavaPairDStreamString, Integer runningCounts = 
pairs.updateStateByKey(updateFunction);
conf = new Configuration();

runningCounts.saveAsNewAPIHadoopFiles(hdfs://172.17.199.229:8020/spark/wordCountOutput/word,
 stream, Text.class, Text.class, (Class? extends 
org.apache.hadoop.mapreduce.OutputFormat?, ?)TextOutputFormat.class,conf);
//jssc.sparkContext().hadoopConfiguration();
jssc.start();
jssc.awaitTermination();
}
}

This is working fine in one node cluster but its giving following error when i 
try to run the same in cluster.

15/02/17 12:57:10 ERROR actor.OneForOneStrategy: 
org.apache.hadoop.conf.Configuration
java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at 
java.io.ObjectOutputStream.writeOrdinaryObject

Re: ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration

2015-02-17 Thread Sean Owen
Tip: to debug where the unserializable reference comes from, run with

-Dsun.io.serialization.extendeddebuginfo=true

On Tue, Feb 17, 2015 at 10:20 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote:
 15/02/17 12:57:10 ERROR actor.OneForOneStrategy:
 org.apache.hadoop.conf.Configuration
 java.io.NotSerializableException: org.apache.hadoop.conf.Configuration

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org