The ScalaTest code that is enclosed at the end of this email message
demonstrates what appears to be a bug in the KryoSerializer. This code was
executed from IntelliJ IDEA (community edition) under Mac OS X 10.11.2
The KryoSerializer is enabled by updating the original SparkContext (that is
supplied by the ScalaTest) via:
1. reading the SparkConf from the SparkContext,2. updating the SparkConf to
enable the KryoSerializer,3. stopping the original SparkContext, and4. creating
a new SparkContext from the updated SparkConf.
Following enabling of the KryoSerializer, execution of the following line (line
56):val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter =>
Array(iter.size).iterator, true).collectthrew the following three instances of
IllegalArgumentException:
java.lang.IllegalArgumentException: Class is not registered:
scala.collection.mutable.WrappedArray$ofIntjava.lang.IllegalArgumentException:
Class is not registered: int[]java.lang.IllegalArgumentException: Class is not
registered: scala.Tuple3[]
which prompted registration of the following three classes with the
KryoSerializer via the SparkConf.registerKryoClasses() method:
classOf[scala.collection.mutable.WrappedArray.ofInt],
classOf[Array[Int]],
classOf[Array[Tuple3[_, _, _]]]
Following registration of these three classes with the KryoSerializer, the
above-indicated 'val rddPartitionsSizes...' line (line 56) executed without
throwing an IllegalArgumentException.
However, execution of the following line (line 59):val
sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter =>
Array(iter.size).iterator, true).collect
threw the following SparkException:
Task not serializable
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1847)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
at
kryo.KryoSerializerTest$$anonfun$1.apply$mcV$sp(KryoSerializerTest.scala:59)
at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
at kryo.KryoSerializerTest$$anonfun$1.apply(KryoSerializerTest.scala:39)
at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
at
org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at
org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
at
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at
org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at
kryo.KryoSerializerTest.org$scalatest$BeforeAndAfterAll$$super$run(KryoSerializerTest.scala:37)
at
org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at kryo.KryoSerializerTest.run(KryoSerializerTest.scala:37)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
at
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.run(Runner.scala:883)
at org.scalatest.tools.Runner.run(Runner.scala)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.IOException: java.lang.IllegalArgumentException: Class is
not registered: scala.reflect.ClassTag$$anon$1
Note: To register this class use:
kryo.register(scala.reflect.ClassTag$$anon$1.class);
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.RangePartitioner.writeObject(Partitioner.scala:209)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 65 more
Caused by: java.lang.IllegalArgumentException: Class is not registered:
scala.reflect.ClassTag$$anon$1
Note: To register this class use:
kryo.register(scala.reflect.ClassTag$$anon$1.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at
org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:220)
at
org.apache.spark.RangePartitioner$$anonfun$writeObject$1$$anonfun$apply$mcV$sp$1.apply(Partitioner.scala:219)
at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:128)
at
org.apache.spark.RangePartitioner$$anonfun$writeObject$1.apply$mcV$sp(Partitioner.scala:219)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 124 more
Moreover, registering the following class did not eliminate the SparkException:
classOf[scala.reflect.ClassTag[_]] My guess is that the
IllegalArgumentException associated with scala.reflect.ClassTag$$anon$1 has
nothing to do with the SparkException but I'm not certain.
Note that none of the exceptions that are discussed above occur if the
KryoSerializer is not enabled. Also, none of the exceptions occur if
spark.kryo.registrationRequired is not set to true on line 44. So, even the
SparkException, which does not complain of an unregistered class, appears to be
related to the requirement for class registration that is specified on line 44.
Note also that the SparkException occurs only for the sorted RDD. In addition,
no complaint of an unregistered class accompanies this SparkException, other
than the complaint about the scala.reflect.ClassTag$$anon$1 class that I think
isn't relevant (although I'm not certain).
So, I have two questions:
First, why does line 59 give rise to the SparkException for the sorted RDD, and
in particular, in the context of class registration that eliminated complaints
about unregistered classes for the unsorted RDD? How might this SparkException
be eliminated?
Second, why does: classOf[Array[Tuple3[_, _, _]]]eliminate the
'java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]'
when in fact drilling down into the debugger suggests that a more thorough
class registration would be: classOf[Array[Tuple3[Int, Int,
Array[Long]]]]That is, why does the wildcard specification '_' suffice? And
would the more thorough specification be preferred, that is, would it result in
a smaller Kryo serialized result?
Thanks in advance for any insight that you can provide into this problem.
package kryo
import context.SharedSparkContext
import org.apache.spark.{SparkContext, SparkConf}
import org.scalatest.FunSuite
class KryoSerializerTest extends FunSuite with SharedSparkContext with
Serializable {
test("kryo serializer") {
// Update the SparkContext to specify the KryoSerializer
val sparkConf: SparkConf = sc.getConf
sparkConf.set(s"spark.serializer",
s"org.apache.spark.serializer.KryoSerializer")
sparkConf.set(s"spark.kryo.registrationRequired", s"true")
sparkConf.registerKryoClasses(
Array(
classOf[scala.collection.mutable.WrappedArray.ofInt],
classOf[Array[Int]],
classOf[Array[Tuple3[_, _, _]]]
)
)
sc.stop
val sparkContext = new SparkContext(sparkConf)
val rdd = sparkContext.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16), 4)
val rddPartitionsSizes: Array[Int] = rdd.mapPartitions(iter =>
Array(iter.size).iterator, true).collect
rddPartitionsSizes.foreach(ps => println(ps))
val sortedRdd = rdd.sortBy(e => e, true)
val sortedRddPartitionsSizes: Array[Int] = sortedRdd.mapPartitions(iter =>
Array(iter.size).iterator, true).collect
sortedRddPartitionsSizes.foreach(ps => println(ps))
}
}