[ 
https://issues.apache.org/jira/browse/SPARK-29251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788101#comment-17788101
 ] 

Shivam Sharma commented on SPARK-29251:
---------------------------------------

I am also getting the same issue, did you find the fix?

> intermittent serialization failures in spark
> --------------------------------------------
>
>                 Key: SPARK-29251
>                 URL: https://issues.apache.org/jira/browse/SPARK-29251
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>         Environment: We are running Spark 2.4.3 on an AWS EMR cluster. Our 
> cluster consists of one driver instance, 3 core instances, and 3 to 12 task 
> instances; the task group autoscales as needed.
>            Reporter: Jerry Vinokurov
>            Priority: Major
>              Labels: bulk-closed
>
> We are running an EMR cluster on AWS that processes somewhere around 100 
> batch jobs a day. These jobs are running various SQL commands to transform 
> data and the data volume ranges between a dozen or a few hundred MB to a few 
> GB on the high end for some jobs, and even around ~1 TB for one particularly 
> large one. We use the Kryo serializer and preregister our classes like so:
>  
> {code:java}
> object KryoRegistrar {
>   val classesToRegister: Array[Class[_]] = Array(
>     classOf[MyModel],
>    [etc]
>   )
> }
> // elsewhere
> val sparkConf = new SparkConf()
>       .registerKryoClasses(KryoRegistrar.classesToRegister)
> {code}
>  
>  
> Intermittently throughout the cluster's operation we have observed jobs 
> terminating with the following stack trace:
>  
>  
> {noformat}
> org.apache.spark.SparkException: Failed to register classes with Kryo
>       at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>       at 
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>       at 
> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>       at 
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>       at 
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>       at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>       at 
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>       at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>       at 
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>       at 
> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>       at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>       at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>       at 
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>       at 
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>       at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>       at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>       at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>       at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>       at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>       at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>       at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>       at 
> org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>       at 
> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>       at 
> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>       at 
> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>       at 
> org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>       at 
> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>       at 
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>       at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>       at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>       at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>       at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>         [our code that writes data to CSV]    
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>       at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>       at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>       ... 132 more{noformat}
> The real name of the model has been changed as this is proprietary code, but 
> otherwise I have reproduced the stack trace in full. This error is not 
> reproducible and seems to strike mostly randomly. It does not seem to be 
> related to the size of the job, and if we rerun the job with identical 
> parameters, it usually just succeeds, although superficially it does seem to 
> coincide with times when cluster load is high. Most of our jobs are run with 
> the following params:
>  
>  
> {noformat}
> spark.driver.memory = 2G
> spark.executor.memory = 4G
> spark.driver.cores = 1
> spark.executor.cores = 2
> spark.executor.instances = 2{noformat}
> We also use dynamic allocation to allow for up to 14 cores per job and fair 
> scheduling.
>  
> This error has been plaguing us for months, and I don't have a good handle on 
> what I can do about it. It seems like a bug in Spark internals and possibly 
> related to [SPARK-21928|https://issues.apache.org/jira/browse/SPARK-21928], 
> but we use Spark 2.4.3 and according to that ticket the problem has been 
> fixed. It's particularly frustrating because the error is not directly 
> reproducible.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to