First as an aside I am pretty sure you cannot reuse one Text and IntWritable object here. Spark does not necessarily finish with one's value before the next call(). Although it should not be directly related to the serialization problem I suspect it is. Your function is not serializable since it contains references to these cached writables. I think removing them fixes both problems. On Jan 22, 2015 9:42 AM, "Skanda" <skanda.ganapa...@gmail.com> wrote:
> Hi All, > > I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm > getting the following runtime exception: > > *Exception in thread "main" org.apache.spark.SparkException: Task not > serializable* > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) > at org.apache.spark.rdd.RDD.map(RDD.scala:271) > at > org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102) > at > org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45) > at XoanonKMeansV2.main(XoanonKMeansV2.java:125) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > *Caused by: java.io.NotSerializableException: > org.apache.hadoop.io.IntWritable* > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > ... 13 more > > Pls find below the code snippet: > > joiningKeyPlusPredictedPoint.mapToPair( > new PairFunction<Tuple2<String, Integer>, Text, > IntWritable>() { > Text text = new Text(); > IntWritable intwritable = new IntWritable(); > > @Override > public Tuple2<Text, IntWritable> call( > Tuple2<String, Integer> tuple) throws > Exception { > text.set(tuple._1); > intwritable.set(tuple._2); > return new Tuple2<Text, IntWritable>(text, > intwritable); > } > }) > > *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq", > Text.class, IntWritable.class, SequenceFileOutputFormat.class);* > > Regards, > Skanda >