I remember it not working for us when we were setting it from the inside and needed to actually pass it
On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov <grapesmo...@gmail.com> wrote: > Hi Vadim, > > Thanks for your suggestion. We do preregister the classes, like so: > > object KryoRegistrar { >> >> val classesToRegister: Array[Class[_]] = Array( >> classOf[MyModel], >> [etc] >> ) } >> > > And then we do: > > val sparkConf = new SparkConf() >> .registerKryoClasses(KryoRegistrar.classesToRegister) >> > > I notice that this is a bit different from your code and I'm wondering > whether there's any functional difference or if these are two ways to get > to the same end. Our code is directly adapted from the Spark documentation > on how to use the Kryo serializer but maybe there's some subtlety here that > I'm missing. > > With regard to the settings, it looks like we currently have the default > settings, which is to say that referenceTracking is true, > registrationRequired is false, unsafe is false, and buffer.max is 64m (none > of our objects are anywhere near that size but... who knows). I will try it > with your suggestions and see if it solves the problem. > > thanks, > Jerry > > On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov <va...@datadoghq.com> wrote: > >> Pre-register your classes: >> >> ``` >> import com.esotericsoftware.kryo.Kryo >> import org.apache.spark.serializer.KryoRegistrator >> >> class MyKryoRegistrator extends KryoRegistrator { >> override def registerClasses(kryo: Kryo): Unit = { >> kryo.register(Class.forName("[[B")) // byte[][] >> kryo.register(classOf[java.lang.Class[_]]) >> } >> } >> ``` >> >> then run with >> >> 'spark.kryo.referenceTracking': 'false', >> 'spark.kryo.registrationRequired': 'false', >> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator', >> 'spark.kryo.unsafe': 'false', >> 'spark.kryoserializer.buffer.max': '256m', >> >> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <grapesmo...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> Posted this some time ago but the problem continues to bedevil us. I'm >>> including a (slightly edited) stack trace that results from this error. If >>> anyone can shed any light on what exactly is happening here and what we can >>> do to avoid it, that would be much appreciated. >>> >>> org.apache.spark.SparkException: Failed to register classes with Kryo >>>> at >>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) >>>> at >>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) >>>> at >>>> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309) >>>> at >>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) >>>> at >>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) >>>> at >>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) >>>> at >>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) >>>> at >>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>>> at >>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) >>>> at >>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) >>>> at >>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) >>>> at >>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) >>>> at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) >>>> at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) >>>> at >>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) >>>> at >>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>>> at >>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>>> at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) >>>> at >>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) >>>> at >>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) >>>> at >>>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) >>>> at >>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) >>>> at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) >>>> at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) >>>> at >>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>>> at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83) >>>> at >>>> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59) >>>> at >>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276) >>>> at >>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105) >>>> at >>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104) >>>> at >>>> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) >>>> at >>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) >>>> at >>>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) >>>> at >>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143) >>>> at >>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) >>>> at >>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) >>>> at >>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) >>>> at >>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >>>> at >>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) >>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) >>>> at >>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) >>>> at >>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) >>>> at >>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) >>>> at >>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) >>>> at >>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) >>>> at >>>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) >>>> at >>>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) >>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) >>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) >>>> at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664) >>>> [our code that writes data to CSV] >>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Class.java:348) >>>> at >>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) >>>> at >>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >>>> at >>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132) >>>> ... 132 more >>>> >>>> >>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <grapesmo...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I am experiencing a strange intermittent failure of my Spark job that >>>> results from serialization issues in Kryo. Here is the stack trace: >>>> >>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:348) >>>>> at >>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) >>>>> at >>>>> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) >>>>> at >>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >>>>> at >>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132) >>>>> ... 204 more >>>>> >>>>> (I've edited the company and model name since this is proprietary code) >>>> >>>> This error does not surface every time the job is run; I would say it >>>> probably shows up once in every 10 runs or so, and there isn't anything >>>> about the input data that triggers this, as I've been able to >>>> (nondeterministically) reproduce the error by simply rerunning the job with >>>> the same inputs over and over again. The model itself is just a plain Scala >>>> case class whose fields are strings and integers, so there's no custom >>>> serialization logic or anything like that. As I understand, this is seems >>>> related to an issue previously documented here >>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this >>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have >>>> confirmed that the version of Spark running there is 2.4.0, with the patch >>>> that is linked in the above issue being part of the code. >>>> >>>> A suggested solution has been to set the extraClasspath config settings >>>> on the driver and executor, but that has not fixed the problem. I'm out of >>>> ideas for how to tackle this and would love to hear if anyone has any >>>> suggestions or strategies for fixing this. >>>> >>>> thanks, >>>> Jerry >>>> >>>> -- >>>> http://www.google.com/profiles/grapesmoker >>>> >>> >>> >>> -- >>> http://www.google.com/profiles/grapesmoker >>> >> >> >> -- >> Sent from my iPhone >> > > > -- > http://www.google.com/profiles/grapesmoker > -- Sent from my iPhone