[ https://issues.apache.org/jira/browse/SPARK-48043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Romain Ardiet updated SPARK-48043: ---------------------------------- Description: I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle introduced in Spark 3.2 but it's failing with a kryo exception when I'm enabling it. The issue seems happening when Executor starts, on KryoSerializerInstance.getAutoReset() check: {code:java} 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Failed to register classes with Kryo org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) ~[kryo-shaded-4.0.2.jar:?] at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.Executor.<init>(Executor.scala:143) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_402] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_402] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402] at org.apache.spark.util.Utils$.classForName(Utils.scala:228) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?] at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] ... 24 more 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0 {code} My job code is the following: {code:java} package com.analytics.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.analytics.archive.AnalyticsEventWrapperRowMapper; import com.analytics.AnalyticsEventWrapper; public class ProcessAnalyticsEventJob { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf() .setMaster("yarn") .registerKryoClasses(AnalyticsEventWrapper.class); SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = AnalyticsEventWrapperRowMapper::map; Dataset<AnalyticsEventWrapper> inputDataset = spark.read() .parquet("s3://bucket/path/to/events") .map(mapAsAnalyticsEventWrapper, Encoders.kryo(AnalyticsEventWrapper.class)); // rest of the job (groupby aggregation and write output) } } {code} the job is launched on EMR with spark-submit command (all application code is packaged in application.jar with maven shade plugin): {code:java} // work spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ /tmp/application.jar \ --deploy-mode cluster \ --verbose // kryo issue spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ /tmp/application.jar \ --deploy-mode cluster \ --verbose \ --conf "spark.shuffle.push.enabled=true" \ --conf "spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver" {code} was: I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle introduced in Spark 3.2 but it's failing with a kryo exception when I'm enabling it. The issue seems happening when Executor starts, on KryoSerializerInstance.getAutoReset() check: {code:java} 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Failed to register classes with Kryo org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) ~[kryo-shaded-4.0.2.jar:?] at org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.Executor.<init>(Executor.scala:143) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_402] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_402] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402] at org.apache.spark.util.Utils$.classForName(Utils.scala:228) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?] at org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176) ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] ... 24 more 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0 {code} My job code is the following: {code:java} package com.analytics.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import com.analytics.archive.AnalyticsEventWrapperRowMapper; import com.analytics.AnalyticsEventWrapper; public class ProcessAnalyticsEventJob { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setMaster("yarn") .registerKryoClasses(AnalyticsEventWrapper.class); SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate(); MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = AnalyticsEventWrapperRowMapper::map; Dataset<AnalyticsEventWrapper> inputDataset = spark.read() .parquet("s3://bucket/path/to/events") .map(mapAsAnalyticsEventWrapper, Encoders.kryo(AnalyticsEventWrapper.class)); // rest of the job (groupby aggregation and write output) } } {code} the job is launched on EMR with spark-submit command (all application code is packaged in application.jar with maven shade plugin): {code:java} // work spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ /tmp/application.jar \ --deploy-mode cluster \ --verbose // kryo issue spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ /tmp/application.jar \ --deploy-mode cluster \ --verbose \ --conf "spark.shuffle.push.enabled=true" \ --conf "spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver" {code} > Kryo serialization issue with push-based shuffle > ------------------------------------------------ > > Key: SPARK-48043 > URL: https://issues.apache.org/jira/browse/SPARK-48043 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 3.4.1 > Environment: AWS EMR 6.14 (Spark 3.4.1) > Reporter: Romain Ardiet > Priority: Major > > I'm running a spark job on AWS EMR. I wanted to test the new push-based > shuffle introduced in Spark 3.2 but it's failing with a kryo exception when > I'm enabling it. > The issue seems happening when Executor starts, on > KryoSerializerInstance.getAutoReset() check: > {code:java} > 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor > self-exiting due to : Unable to create executor due to Failed to register > classes with Kryo > org.apache.spark.SparkException: Failed to register classes with Kryo > at > org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > ~[scala-library-2.12.15.jar:?] > at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48) > ~[kryo-shaded-4.0.2.jar:?] > at > org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at org.apache.spark.executor.Executor.<init>(Executor.scala:143) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_402] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_402] > at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] > Caused by: java.lang.ClassNotFoundException: > com.analytics.AnalyticsEventWrapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:387) > ~[?:1.8.0_402] > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402] > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) > ~[?:1.8.0_402] > at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402] > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402] > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402] > at org.apache.spark.util.Utils$.classForName(Utils.scala:228) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > ~[scala-library-2.12.15.jar:?] > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > ~[scala-library-2.12.15.jar:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > ~[scala-library-2.12.15.jar:?] > at > org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176) > ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1] > ... 24 more > 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a > shutdown > 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0 > {code} > My job code is the following: > {code:java} > package com.analytics.spark; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.function.MapFunction; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Encoders; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import com.analytics.archive.AnalyticsEventWrapperRowMapper; > import com.analytics.AnalyticsEventWrapper; > public class ProcessAnalyticsEventJob { > public static void main(String[] args) throws Exception { > SparkConf sparkConf = new SparkConf() > .setMaster("yarn") > .registerKryoClasses(AnalyticsEventWrapper.class); > > SparkSession spark = > SparkSession.builder().config(sparkConf).getOrCreate(); > > MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = > AnalyticsEventWrapperRowMapper::map; > Dataset<AnalyticsEventWrapper> inputDataset = spark.read() > .parquet("s3://bucket/path/to/events") > .map(mapAsAnalyticsEventWrapper, > Encoders.kryo(AnalyticsEventWrapper.class)); > > // rest of the job (groupby aggregation and write output) > } > } {code} > the job is launched on EMR with spark-submit command (all application code is > packaged in application.jar with maven shade plugin): > {code:java} > // work > spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ > /tmp/application.jar \ > --deploy-mode cluster \ > --verbose > // kryo issue > spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \ > /tmp/application.jar \ > --deploy-mode cluster \ > --verbose \ > --conf "spark.shuffle.push.enabled=true" \ > --conf > "spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver" > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org