[
https://issues.apache.org/jira/browse/SPARK-48043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Romain Ardiet updated SPARK-48043:
----------------------------------
Summary: Kryo registerKryoClasses() issue with push-based shuffle (was:
Kryo serialization issue with push-based shuffle)
> Kryo registerKryoClasses() issue with push-based shuffle
> --------------------------------------------------------
>
> Key: SPARK-48043
> URL: https://issues.apache.org/jira/browse/SPARK-48043
> Project: Spark
> Issue Type: Bug
> Components: Shuffle
> Affects Versions: 3.4.1
> Environment: AWS EMR 6.14 (Spark 3.4.1)
> Reporter: Romain Ardiet
> Priority: Major
>
> I'm running a spark job on AWS EMR. I wanted to test the new push-based
> shuffle introduced in Spark 3.2 but it's failing with a kryo exception when
> I'm enabling it.
> The issue is happening when Executor starts, during
> KryoSerializerInstance.getAutoReset() check:
> {code:java}
> 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor
> self-exiting due to : Unable to create executor due to Failed to register
> classes with Kryo
> org.apache.spark.SparkException: Failed to register classes with Kryo
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> ~[scala-library-2.12.15.jar:?]
> at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
> ~[kryo-shaded-4.0.2.jar:?]
> at
> org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.executor.Executor.<init>(Executor.scala:143)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_402]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_402]
> at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
> Caused by: java.lang.ClassNotFoundException:
> com.analytics.AnalyticsEventWrapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
> ~[?:1.8.0_402]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> ~[?:1.8.0_402]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
> at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> ~[scala-library-2.12.15.jar:?]
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> ... 24 more
> 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a
> shutdown
> 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
> {code}
> My job code is the following:
> {code:java}
> package com.analytics.spark;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import com.analytics.archive.AnalyticsEventWrapperRowMapper;
> import com.analytics.AnalyticsEventWrapper;
> public class ProcessAnalyticsEventJob {
> public static void main(String[] args) throws Exception {
> SparkConf sparkConf = new SparkConf()
> .setMaster("yarn")
> .registerKryoClasses(AnalyticsEventWrapper.class);
>
> SparkSession spark =
> SparkSession.builder().config(sparkConf).getOrCreate();
>
> MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper =
> AnalyticsEventWrapperRowMapper::map;
> Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
> .parquet("s3://bucket/path/to/events")
> .map(mapAsAnalyticsEventWrapper,
> Encoders.kryo(AnalyticsEventWrapper.class));
>
> // rest of the job (shuffle aggregation and output write)
> }
> } {code}
> AnalyticsEventWrapperRowMapper.java
> {code:java}
> package com.analytics.archive;
> import org.apache.spark.sql.Row;
> import com.analytics.AnalyticsEventWrapper;
> public class AnalyticsEventWrapperRowMapper {
> public static AnalyticsEventWrapper map(Row r) {
> AnalyticsEventWrapper analyticsEventWrapper = new
> AnalyticsEventWrapper();
> analyticsEventWrapper.setId(r.getAs("id"));
> analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
> analyticsEventWrapper.setType(r.getAs("type"));
> analyticsEventWrapper.setTopic(r.getAs("topic"));
> return analyticsEventWrapper;
> }
> } {code}
> AnalyticsEventWrapper.java
> {code:java}
> package com.analytics;
> public class AnalyticsEventWrapper {
> private String id;
> private Long timestamp;
> private String type;
> private String topic;
>
> public String getId() {
> return id;
> }
>
> public void setId(String id) {
> this.id = id;
> }
>
> public Long getTimestamp() {
> return timestamp;
> }
>
> public void setTimestamp(Long timestamp) {
> this.timestamp = timestamp;
> }
>
> public String getType() {
> return type;
> }
>
> public void setType(String type) {
> this.type = type;
> }
>
> public String getTopic() {
> return topic;
> }
>
> public void setTopic(String topic) {
> this.topic = topic;
> }
> } {code}
> the job is launched on EMR with spark-submit command (all application code is
> packaged in application.jar with maven shade plugin):
> {code:java}
> // work
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
> /tmp/application.jar \
> --deploy-mode cluster \
> --verbose
> // kryo issue
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
> /tmp/application.jar \
> --deploy-mode cluster \
> --verbose \
> --conf "spark.shuffle.push.enabled=true"
> // yarn-site.xml of Node Managers
> <property>
> <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
> <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
> </property> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]