[ https://issues.apache.org/jira/browse/SPARK-29251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788101#comment-17788101 ]
Shivam Sharma commented on SPARK-29251: --------------------------------------- I am also getting the same issue, did you find the fix? > intermittent serialization failures in spark > -------------------------------------------- > > Key: SPARK-29251 > URL: https://issues.apache.org/jira/browse/SPARK-29251 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3 > Environment: We are running Spark 2.4.3 on an AWS EMR cluster. Our > cluster consists of one driver instance, 3 core instances, and 3 to 12 task > instances; the task group autoscales as needed. > Reporter: Jerry Vinokurov > Priority: Major > Labels: bulk-closed > > We are running an EMR cluster on AWS that processes somewhere around 100 > batch jobs a day. These jobs are running various SQL commands to transform > data and the data volume ranges between a dozen or a few hundred MB to a few > GB on the high end for some jobs, and even around ~1 TB for one particularly > large one. We use the Kryo serializer and preregister our classes like so: > > {code:java} > object KryoRegistrar { > val classesToRegister: Array[Class[_]] = Array( > classOf[MyModel], > [etc] > ) > } > // elsewhere > val sparkConf = new SparkConf() > .registerKryoClasses(KryoRegistrar.classesToRegister) > {code} > > > Intermittently throughout the cluster's operation we have observed jobs > terminating with the following stack trace: > > > {noformat} > org.apache.spark.SparkException: Failed to register classes with Kryo > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140) > at > org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324) > at > org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309) > at > org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218) > at > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288) > at > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127) > at > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) > at > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) > at > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) > at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103) > at > org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129) > at > org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305) > at > org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327) > at > org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) > at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83) > at > org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59) > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276) > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105) > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104) > at > org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) > at > org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121) > at > org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) > at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664) > [our code that writes data to CSV] > Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) > at > org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132) > ... 132 more{noformat} > The real name of the model has been changed as this is proprietary code, but > otherwise I have reproduced the stack trace in full. This error is not > reproducible and seems to strike mostly randomly. It does not seem to be > related to the size of the job, and if we rerun the job with identical > parameters, it usually just succeeds, although superficially it does seem to > coincide with times when cluster load is high. Most of our jobs are run with > the following params: > > > {noformat} > spark.driver.memory = 2G > spark.executor.memory = 4G > spark.driver.cores = 1 > spark.executor.cores = 2 > spark.executor.instances = 2{noformat} > We also use dynamic allocation to allow for up to 14 cores per job and fair > scheduling. > > This error has been plaguing us for months, and I don't have a good handle on > what I can do about it. It seems like a bug in Spark internals and possibly > related to [SPARK-21928|https://issues.apache.org/jira/browse/SPARK-21928], > but we use Spark 2.4.3 and according to that ticket the problem has been > fixed. It's particularly frustrating because the error is not directly > reproducible. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org