Yeah it worked like charm!! Thank you!

On Thu, Jan 22, 2015 at 2:28 PM, Sean Owen <so...@cloudera.com> wrote:

> First as an aside I am pretty sure you cannot reuse one Text and
> IntWritable object here. Spark does not necessarily finish with one's value
> before the next call(). Although it should not be directly related to the
> serialization problem I suspect it is. Your function is not serializable
> since it contains references to these cached writables. I think removing
> them fixes both problems.
> On Jan 22, 2015 9:42 AM, "Skanda" <skanda.ganapa...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm
>> getting the following runtime exception:
>>
>> *Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable*
>>     at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>     at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
>>     at org.apache.spark.rdd.RDD.map(RDD.scala:271)
>>     at
>> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102)
>>     at
>> org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45)
>>     at XoanonKMeansV2.main(XoanonKMeansV2.java:125)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> *Caused by: java.io.NotSerializableException:
>> org.apache.hadoop.io.IntWritable*
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>     at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>     at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>     at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>     ... 13 more
>>
>> Pls find below the code snippet:
>>
>> joiningKeyPlusPredictedPoint.mapToPair(
>>                 new PairFunction<Tuple2<String, Integer>, Text,
>> IntWritable>() {
>>                     Text text = new Text();
>>                     IntWritable intwritable = new IntWritable();
>>
>>                     @Override
>>                     public Tuple2<Text, IntWritable> call(
>>                             Tuple2<String, Integer> tuple) throws
>> Exception {
>>                         text.set(tuple._1);
>>                         intwritable.set(tuple._2);
>>                         return new Tuple2<Text, IntWritable>(text,
>> intwritable);
>>                     }
>>                 })
>>
>> *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq",
>> Text.class, IntWritable.class, SequenceFileOutputFormat.class);*
>>
>> Regards,
>> Skanda
>>
>

Reply via email to