[ 
https://issues.apache.org/jira/browse/SPARK-12518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071419#comment-15071419
 ] 

Zhanpeng Wu commented on SPARK-12518:
-------------------------------------

I have solved this issue. The problem is the BAMRecordCodec object modified 
with *static* in the serialization class, and this may cause error while 
deserializing in the multi-tasks per executor environment, because all the 
tasks in the same executor would share the resource.

> Problem in Spark deserialization with htsjdk BAMRecordCodec
> -----------------------------------------------------------
>
>                 Key: SPARK-12518
>                 URL: https://issues.apache.org/jira/browse/SPARK-12518
>             Project: Spark
>          Issue Type: Question
>          Components: Java API
>    Affects Versions: 1.5.2
>         Environment: Linux Red Hat 4.8.2-16, Java 8, htsjdk-1.130
>            Reporter: Zhanpeng Wu
>
> When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark 
> application, I found some problem in record deserialization. The object of 
> *SAMRecord* could not be deserialzed and throw the exception: 
> {quote}
> WARN ThrowableSerializationWrapper: Task exception could not be deserialized
> java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:340)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>         at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>         at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at 
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>         at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
>         at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>         at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> It seems that the application encountered a premature EOF when deserialing.
> Here is my test code: 
> {code:title=Test.java|borderStyle=solid}
> public class Test {
>       public static void main(String[] args) {
>               SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk 
> - Test ver");
>               JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>               jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, 
> LongWritable.class, SAMRecordWritable.class,
>                               jsc.hadoopConfiguration())
>                               .map(new Function<Tuple2<LongWritable, 
> SAMRecordWritable>, SAMRecordWritable>() {
>                                       private static final long 
> serialVersionUID = 1791992620460009575L;
>                                       @Override
>                                       public SAMRecordWritable 
> call(Tuple2<LongWritable, SAMRecordWritable> tuple2)
>                                         throws Exception {
>                                               return tuple2._2;
>                                       }
>                               }).repartition(18).saveAsTextFile(args[1]);
>               jsc.close();
>               jsc.stop();
>       }
> }
> {code}
> My custom JavaSerializer class: 
> {code:title=SAMRecordWritable .java|borderStyle=solid}
> public class SAMRecordWritable extends JavaSerializer {
>       private static final long serialVersionUID = 8212888201641460871L;
>       private static final BAMRecordCodec lazyCodec =
>               new BAMRecordCodec(null, new LazyBAMRecordFactory());
>       private transient SAMRecord record;
>       public SAMRecord get()            { return record; }
>       public void      set(SAMRecord r) { record = r; }
>       
>       /* JavaSerializer */
>       public void writeExternal(java.io.ObjectOutput out) {
>               final BAMRecordCodec codec = new 
> BAMRecordCodec(record.getHeader());
>               codec.setOutputStream(new DataOutputWrapper(out));
>               codec.encode(record);
>       }
>       
>       public void readExternal(java.io.ObjectInput in) {
>               lazyCodec.setInputStream(new DataInputWrapper(in));
>               record = lazyCodec.decode();
>       }
> }
> {code}
> But when I serialize the record to a local file, not in Spark, it works. This 
> confuses me a lot. Anybody help?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to