I have set Kryo Serializer as default serializer in SparkConf and Spark UI confirms it too, but in the Spark logs I'm getting this exception,
java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1370) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:74) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:993) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:58) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:50) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:49) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Here I've two questions: 1- Why still java serializer being used? 2- How to solve java.io.OptionalDataException I know the causes of java.io.OptionalDataException, but couldn't solve it. Here is my class: class GeometryWritable(var geometry: Geometry) extends Writable with Externalizable with KryoSerializable { def this() = this(null) override def write(dataOutput: DataOutput) { val bytes = GeometryEngine.geometryToEsriShape(geometry) dataOutput.writeInt(bytes.length); dataOutput.write(bytes); } override def readFields(dataInput: DataInput) { val bytesArray: Array[Byte] = new Array[Byte](dataInput.readInt()) dataInput.readFully(bytesArray) geometry = GeometryEngine.geometryFromEsriShape(bytesArray, Geometry.Type.Unknown); } /** * override writeExternal (as class is extends by Writable interface) * * @param dataOutPut, Data Output Stream where data has to pushed * @return Unit */ @throws(classOf[IOException]) override def writeExternal(dataOutput: ObjectOutput) { val bytes = GeometryEngine.geometryToEsriShape(geometry) dataOutput.writeInt(bytes.length); dataOutput.write(bytes); } /** * override readFields (as class is extends by Writable interface) * * @param dataInput, Data Input Stream from where data has to read * @return Unit */ @throws(classOf[IOException]) @throws(classOf[ClassNotFoundException]) override def readExternal(dataInput: ObjectInput) { val bytesArray: Array[Byte] = new Array[Byte](dataInput.readInt()) dataInput.readFully(bytesArray) geometry = GeometryEngine.geometryFromEsriShape(bytesArray, Geometry.Type.Unknown) } def write(kryo: Kryo, output: Output) { val bytes = GeometryEngine.geometryToEsriShape(geometry) output.writeInt(bytes.length); output.write(bytes); } def read(kryo: Kryo, input: Input) { val bytesArray: Array[Byte] = new Array[Byte](input.readInt()) input.read(bytesArray, 0, bytesArray.length) geometry = GeometryEngine.geometryFromEsriShape(bytesArray, Geometry.Type.Unknown) } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-and-Kryo-Serialization-Java-io-OptionalDataException-tp22306.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org