DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
You've probably hit this bug: https://issues.apache.org/jira/browse/SPARK-7180 It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to false and see if it goes away. On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com wrote: Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by: