[ https://issues.apache.org/jira/browse/SPARK-12518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071419#comment-15071419 ]
Zhanpeng Wu commented on SPARK-12518: ------------------------------------- I have solved this issue. The problem is the BAMRecordCodec object modified with *static* in the serialization class, and this may cause error while deserializing in the multi-tasks per executor environment, because all the tasks in the same executor would share the resource. > Problem in Spark deserialization with htsjdk BAMRecordCodec > ----------------------------------------------------------- > > Key: SPARK-12518 > URL: https://issues.apache.org/jira/browse/SPARK-12518 > Project: Spark > Issue Type: Question > Components: Java API > Affects Versions: 1.5.2 > Environment: Linux Red Hat 4.8.2-16, Java 8, htsjdk-1.130 > Reporter: Zhanpeng Wu > > When I used [htsjdk|https://github.com/samtools/htsjdk] in my Spark > application, I found some problem in record deserialization. The object of > *SAMRecord* could not be deserialzed and throw the exception: > {quote} > WARN ThrowableSerializationWrapper: Task exception could not be deserialized > java.lang.ClassNotFoundException: htsjdk.samtools.util.RuntimeIOException > at java.net.URLClassLoader$1.run(URLClassLoader.java:372) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:360) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:340) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105) > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > It seems that the application encountered a premature EOF when deserialing. > Here is my test code: > {code:title=Test.java|borderStyle=solid} > public class Test { > public static void main(String[] args) { > SparkConf sparkConf = new SparkConf().setAppName("Spark htsjdk > - Test ver"); > JavaSparkContext jsc = new JavaSparkContext(sparkConf); > jsc.newAPIHadoopFile(args[0], BAMInputFormat.class, > LongWritable.class, SAMRecordWritable.class, > jsc.hadoopConfiguration()) > .map(new Function<Tuple2<LongWritable, > SAMRecordWritable>, SAMRecordWritable>() { > private static final long > serialVersionUID = 1791992620460009575L; > @Override > public SAMRecordWritable > call(Tuple2<LongWritable, SAMRecordWritable> tuple2) > throws Exception { > return tuple2._2; > } > }).repartition(18).saveAsTextFile(args[1]); > jsc.close(); > jsc.stop(); > } > } > {code} > My custom JavaSerializer class: > {code:title=SAMRecordWritable .java|borderStyle=solid} > public class SAMRecordWritable extends JavaSerializer { > private static final long serialVersionUID = 8212888201641460871L; > private static final BAMRecordCodec lazyCodec = > new BAMRecordCodec(null, new LazyBAMRecordFactory()); > private transient SAMRecord record; > public SAMRecord get() { return record; } > public void set(SAMRecord r) { record = r; } > > /* JavaSerializer */ > public void writeExternal(java.io.ObjectOutput out) { > final BAMRecordCodec codec = new > BAMRecordCodec(record.getHeader()); > codec.setOutputStream(new DataOutputWrapper(out)); > codec.encode(record); > } > > public void readExternal(java.io.ObjectInput in) { > lazyCodec.setInputStream(new DataInputWrapper(in)); > record = lazyCodec.decode(); > } > } > {code} > But when I serialize the record to a local file, not in Spark, it works. This > confuses me a lot. Anybody help? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org