Hi Vadim,

Thanks for your suggestion. We do preregister the classes, like so:

object KryoRegistrar {
>
>   val classesToRegister: Array[Class[_]] = Array(
>     classOf[MyModel],
>    [etc]
> ) }
>

And then we do:

val sparkConf = new SparkConf()
>       .registerKryoClasses(KryoRegistrar.classesToRegister)
>

 I notice that this is a bit different from your code and I'm wondering
whether there's any functional difference or if these are two ways to get
to the same end. Our code is directly adapted from the Spark documentation
on how to use the Kryo serializer but maybe there's some subtlety here that
I'm missing.

With regard to the settings, it looks like we currently have the default
settings, which is to say that referenceTracking is true,
registrationRequired is false, unsafe is false, and buffer.max is 64m (none
of our objects are anywhere near that size but... who knows). I will try it
with your suggestions and see if it solves the problem.

thanks,
Jerry

On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov <va...@datadoghq.com> wrote:

> Pre-register your classes:
>
> ```
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo): Unit = {
>     kryo.register(Class.forName("[[B")) // byte[][]
>     kryo.register(classOf[java.lang.Class[_]])
>   }
> }
> ```
>
> then run with
>
> 'spark.kryo.referenceTracking': 'false',
> 'spark.kryo.registrationRequired': 'false',
> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
> 'spark.kryo.unsafe': 'false',
> 'spark.kryoserializer.buffer.max': '256m',
>
> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <grapesmo...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> Posted this some time ago but the problem continues to bedevil us. I'm
>> including a (slightly edited) stack trace that results from this error. If
>> anyone can shed any light on what exactly is happening here and what we can
>> do to avoid it, that would be much appreciated.
>>
>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>     at 
>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>     at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>     at 
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>     at 
>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>>     at 
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>     at 
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>     at 
>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>     at 
>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>     at 
>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>     at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>     at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>     at 
>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>     at 
>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>     at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>     at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>>     at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>>     at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>>     at 
>>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>>     at 
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>     at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>>     at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>     at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>     at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>>     at 
>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>     at 
>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>     at 
>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>     at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>>     at 
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>     at 
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>     at 
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>     at 
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>     at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at 
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>     at 
>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>     at 
>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>     at 
>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>     at 
>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>     at 
>>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>>     at 
>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>>     at 
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>>     at 
>>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>>     at 
>>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>>     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>>     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>     at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>         [our code that writes data to CSV]  
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>     at java.lang.Class.forName0(Native Method)
>>>     at java.lang.Class.forName(Class.java:348)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>     at 
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>     at 
>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>     ... 132 more
>>>
>>>
>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <grapesmo...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am experiencing a strange intermittent failure of my Spark job that
>>> results from serialization issues in Kryo. Here is the stack trace:
>>>
>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>    at java.lang.Class.forName0(Native Method)
>>>>    at java.lang.Class.forName(Class.java:348)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>    ... 204 more
>>>>
>>>> (I've edited the company and model name since this is proprietary code)
>>>
>>> This error does not surface every time the job is run; I would say it
>>> probably shows up once in every 10 runs or so, and there isn't anything
>>> about the input data that triggers this, as I've been able to
>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>> the same inputs over and over again. The model itself is just a plain Scala
>>> case class whose fields are strings and integers, so there's no custom
>>> serialization logic or anything like that. As I understand, this is seems
>>> related to an issue previously documented here
>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>> that is linked in the above issue being part of the code.
>>>
>>> A suggested solution has been to set the extraClasspath config settings
>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>> ideas for how to tackle this and would love to hear if anyone has any
>>> suggestions or strategies for fixing this.
>>>
>>> thanks,
>>> Jerry
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
> Sent from my iPhone
>


-- 
http://www.google.com/profiles/grapesmoker

Reply via email to