I remember it not working for us when we were setting it from the inside
and needed to actually pass it

On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov <grapesmo...@gmail.com>
wrote:

> Hi Vadim,
>
> Thanks for your suggestion. We do preregister the classes, like so:
>
> object KryoRegistrar {
>>
>>   val classesToRegister: Array[Class[_]] = Array(
>>     classOf[MyModel],
>>    [etc]
>> ) }
>>
>
> And then we do:
>
> val sparkConf = new SparkConf()
>>       .registerKryoClasses(KryoRegistrar.classesToRegister)
>>
>
>  I notice that this is a bit different from your code and I'm wondering
> whether there's any functional difference or if these are two ways to get
> to the same end. Our code is directly adapted from the Spark documentation
> on how to use the Kryo serializer but maybe there's some subtlety here that
> I'm missing.
>
> With regard to the settings, it looks like we currently have the default
> settings, which is to say that referenceTracking is true,
> registrationRequired is false, unsafe is false, and buffer.max is 64m (none
> of our objects are anywhere near that size but... who knows). I will try it
> with your suggestions and see if it solves the problem.
>
> thanks,
> Jerry
>
> On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov <va...@datadoghq.com> wrote:
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>>     kryo.register(Class.forName("[[B")) // byte[][]
>>     kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <grapesmo...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>>    at 
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>>    at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>>    at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>>>    at 
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>    at 
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>>    at 
>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>>    at 
>>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>>    at 
>>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>>    at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>>    at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>>    at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>>    at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>    at 
>>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>>    at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>>>    at 
>>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>>>    at 
>>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>>>    at 
>>>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>>>    at 
>>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>>    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>>>    at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>>    at 
>>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>>    at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>>>    at 
>>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>>    at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>    at 
>>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>>    at 
>>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>>>    at 
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>>    at 
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>>    at 
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>>    at 
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>>    at 
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>    at 
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>>    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>>    at 
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>>    at 
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>>    at 
>>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>>    at 
>>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>>    at 
>>>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>>>    at 
>>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>>>    at 
>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>>>    at 
>>>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>>>    at 
>>>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>>>    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>>>    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>>    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>>         [our code that writes data to CSV] 
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>    at java.lang.Class.forName0(Native Method)
>>>>    at java.lang.Class.forName(Class.java:348)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>    at 
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>    at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>    ... 132 more
>>>>
>>>>
>>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <grapesmo...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am experiencing a strange intermittent failure of my Spark job that
>>>> results from serialization issues in Kryo. Here is the stack trace:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>   at java.lang.Class.forName0(Native Method)
>>>>>   at java.lang.Class.forName(Class.java:348)
>>>>>   at 
>>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>>   at 
>>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>>   at 
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>>   at 
>>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>>   ... 204 more
>>>>>
>>>>> (I've edited the company and model name since this is proprietary code)
>>>>
>>>> This error does not surface every time the job is run; I would say it
>>>> probably shows up once in every 10 runs or so, and there isn't anything
>>>> about the input data that triggers this, as I've been able to
>>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>>> the same inputs over and over again. The model itself is just a plain Scala
>>>> case class whose fields are strings and integers, so there's no custom
>>>> serialization logic or anything like that. As I understand, this is seems
>>>> related to an issue previously documented here
>>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>>> that is linked in the above issue being part of the code.
>>>>
>>>> A suggested solution has been to set the extraClasspath config settings
>>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>>> ideas for how to tackle this and would love to hear if anyone has any
>>>> suggestions or strategies for fixing this.
>>>>
>>>> thanks,
>>>> Jerry
>>>>
>>>> --
>>>> http://www.google.com/profiles/grapesmoker
>>>>
>>>
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> Sent from my iPhone
>>
>
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
Sent from my iPhone

Reply via email to