First as an aside I am pretty sure you cannot reuse one Text and
IntWritable object here. Spark does not necessarily finish with one's value
before the next call(). Although it should not be directly related to the
serialization problem I suspect it is. Your function is not serializable
since it contains references to these cached writables. I think removing
them fixes both problems.
On Jan 22, 2015 9:42 AM, "Skanda" <skanda.ganapa...@gmail.com> wrote:

> Hi All,
>
> I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm
> getting the following runtime exception:
>
> *Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable*
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
>     at org.apache.spark.rdd.RDD.map(RDD.scala:271)
>     at
> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102)
>     at
> org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45)
>     at XoanonKMeansV2.main(XoanonKMeansV2.java:125)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> *Caused by: java.io.NotSerializableException:
> org.apache.hadoop.io.IntWritable*
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>     ... 13 more
>
> Pls find below the code snippet:
>
> joiningKeyPlusPredictedPoint.mapToPair(
>                 new PairFunction<Tuple2<String, Integer>, Text,
> IntWritable>() {
>                     Text text = new Text();
>                     IntWritable intwritable = new IntWritable();
>
>                     @Override
>                     public Tuple2<Text, IntWritable> call(
>                             Tuple2<String, Integer> tuple) throws
> Exception {
>                         text.set(tuple._1);
>                         intwritable.set(tuple._2);
>                         return new Tuple2<Text, IntWritable>(text,
> intwritable);
>                     }
>                 })
>
> *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq",
> Text.class, IntWritable.class, SequenceFileOutputFormat.class);*
>
> Regards,
> Skanda
>

Reply via email to