Yeah it worked like charm!! Thank you! On Thu, Jan 22, 2015 at 2:28 PM, Sean Owen <so...@cloudera.com> wrote:
> First as an aside I am pretty sure you cannot reuse one Text and > IntWritable object here. Spark does not necessarily finish with one's value > before the next call(). Although it should not be directly related to the > serialization problem I suspect it is. Your function is not serializable > since it contains references to these cached writables. I think removing > them fixes both problems. > On Jan 22, 2015 9:42 AM, "Skanda" <skanda.ganapa...@gmail.com> wrote: > >> Hi All, >> >> I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm >> getting the following runtime exception: >> >> *Exception in thread "main" org.apache.spark.SparkException: Task not >> serializable* >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) >> at org.apache.spark.rdd.RDD.map(RDD.scala:271) >> at >> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102) >> at >> org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45) >> at XoanonKMeansV2.main(XoanonKMeansV2.java:125) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> *Caused by: java.io.NotSerializableException: >> org.apache.hadoop.io.IntWritable* >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >> ... 13 more >> >> Pls find below the code snippet: >> >> joiningKeyPlusPredictedPoint.mapToPair( >> new PairFunction<Tuple2<String, Integer>, Text, >> IntWritable>() { >> Text text = new Text(); >> IntWritable intwritable = new IntWritable(); >> >> @Override >> public Tuple2<Text, IntWritable> call( >> Tuple2<String, Integer> tuple) throws >> Exception { >> text.set(tuple._1); >> intwritable.set(tuple._2); >> return new Tuple2<Text, IntWritable>(text, >> intwritable); >> } >> }) >> >> *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq", >> Text.class, IntWritable.class, SequenceFileOutputFormat.class);* >> >> Regards, >> Skanda >> >