[ 
https://issues.apache.org/jira/browse/SPARK-48043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romain Ardiet updated SPARK-48043:
----------------------------------
    Description: 
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle 
introduced in Spark 3.2 but it's failing to register kryo classes when I'm 
enabling it.

The issue is happening once Executors are initialized, during 
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting 
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
 ~[kryo-shaded-4.0.2.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_402]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_402]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
    at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
    at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.15.jar:?]
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    ... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
 {code}
My job code is the following:
{code:java}
package com.analytics.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.serializer.KryoSerializer; 
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;

public class ProcessAnalyticsEventJob {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf()
        .setMaster("yarn")
        .set("spark.serializer", KryoSerializer.class.getCanonicalName())
        .registerKryoClasses(AnalyticsEventWrapper.class);

    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
        
    MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
AnalyticsEventWrapperRowMapper::map;
    Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
        .parquet("s3://bucket/path/to/events")
        .map(mapAsAnalyticsEventWrapper, 
Encoders.kryo(AnalyticsEventWrapper.class));
        
    // rest of the job (shuffle aggregation and output write)
  }
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;

import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;

public class AnalyticsEventWrapperRowMapper {
    public static AnalyticsEventWrapper map(Row r) {
        AnalyticsEventWrapper analyticsEventWrapper = new 
AnalyticsEventWrapper();
        analyticsEventWrapper.setId(r.getAs("id"));
        analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
        analyticsEventWrapper.setType(r.getAs("type"));
        analyticsEventWrapper.setTopic(r.getAs("topic"));
        return analyticsEventWrapper;
    }
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;

public class AnalyticsEventWrapper {
    private String id;
    private Long timestamp;
    private String type;
    private String topic;
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public Long getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    
    public String getType() {
        return type;
    }
    
    public void setType(String type) {
        this.type = type;
    }
    
    public String getTopic() {
        return topic;
    }
    
    public void setTopic(String topic) {
        this.topic = topic;
    }
} {code}
the job is launched on EMR with spark-submit command (all application code is 
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=false"

 // kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=true"

// yarn-site.xml of Node Managers
<property>
  <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
  <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property>  {code}
h2. Update 

When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The 
error fades way and push-based shuffle can be enabled with 
spark.shuffle.push.enabled=true.
h2. Investigation

Executor.scala calls env.blockmanager.initialize() that check 
Utils.isPushBasedShuffleEnabled(), this will trigger an instanciation of Kryo 
and the initialization of classes passed into registerKryoClasses() during the 
getAutoReset() check.

At this step, the user class loader has not yet be registered as it's happening 
later during the Executor bootstrap process.

  was:
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle 
introduced in Spark 3.2 but it's failing to register kryo classes when I'm 
enabling it.

The issue is happening once Executors are initialized, during 
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting 
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
 ~[kryo-shaded-4.0.2.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_402]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_402]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
    at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
    at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.15.jar:?]
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    ... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
 {code}
My job code is the following:
{code:java}
package com.analytics.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.serializer.KryoSerializer; 
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;

public class ProcessAnalyticsEventJob {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf()
        .setMaster("yarn")
        .set("spark.serializer", KryoSerializer.class.getCanonicalName())
        .registerKryoClasses(AnalyticsEventWrapper.class);

    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
        
    MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
AnalyticsEventWrapperRowMapper::map;
    Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
        .parquet("s3://bucket/path/to/events")
        .map(mapAsAnalyticsEventWrapper, 
Encoders.kryo(AnalyticsEventWrapper.class));
        
    // rest of the job (shuffle aggregation and output write)
  }
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;

import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;

public class AnalyticsEventWrapperRowMapper {
    public static AnalyticsEventWrapper map(Row r) {
        AnalyticsEventWrapper analyticsEventWrapper = new 
AnalyticsEventWrapper();
        analyticsEventWrapper.setId(r.getAs("id"));
        analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
        analyticsEventWrapper.setType(r.getAs("type"));
        analyticsEventWrapper.setTopic(r.getAs("topic"));
        return analyticsEventWrapper;
    }
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;

public class AnalyticsEventWrapper {
    private String id;
    private Long timestamp;
    private String type;
    private String topic;
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public Long getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    
    public String getType() {
        return type;
    }
    
    public void setType(String type) {
        this.type = type;
    }
    
    public String getTopic() {
        return topic;
    }
    
    public void setTopic(String topic) {
        this.topic = topic;
    }
} {code}
the job is launched on EMR with spark-submit command (all application code is 
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=false"

 // kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=true"

// yarn-site.xml of Node Managers
<property>
  <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
  <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property>  {code}
h2. Update 

When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The 
error fades way and push-based shuffle can be enabled with 
spark.shuffle.push.enabled=true.
h2. Investigation

BlockManager.initialize() calls Utils.isPushBasedShuffleEnabled(), that will 
trigger an instanciation of Kryo and the initialization of classes 


> Kryo registerKryoClasses() issue with push-based shuffle
> --------------------------------------------------------
>
>                 Key: SPARK-48043
>                 URL: https://issues.apache.org/jira/browse/SPARK-48043
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.4.1
>         Environment: AWS EMR 6.14 (Spark 3.4.1)
>            Reporter: Romain Ardiet
>            Priority: Major
>
> I'm running a spark job on AWS EMR. I wanted to test the new push-based 
> shuffle introduced in Spark 3.2 but it's failing to register kryo classes 
> when I'm enabling it.
> The issue is happening once Executors are initialized, during 
> KryoSerializerInstance.getAutoReset() check:
> {code:java}
> 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor 
> self-exiting due to : Unable to create executor due to Failed to register 
> classes with Kryo
> org.apache.spark.SparkException: Failed to register classes with Kryo
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
> ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
>  ~[kryo-shaded-4.0.2.jar:?]
>     at 
> org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_402]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_402]
>     at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
> Caused by: java.lang.ClassNotFoundException: 
> com.analytics.AnalyticsEventWrapper
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:387) 
> ~[?:1.8.0_402]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
> ~[?:1.8.0_402]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
>     at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
>     at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
>     at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     ... 24 more
> 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
> shutdown
> 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
>  {code}
> My job code is the following:
> {code:java}
> package com.analytics.spark;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.serializer.KryoSerializer; 
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import com.analytics.archive.AnalyticsEventWrapperRowMapper;
> import com.analytics.AnalyticsEventWrapper;
> public class ProcessAnalyticsEventJob {
>   public static void main(String[] args) throws Exception {
>     SparkConf sparkConf = new SparkConf()
>         .setMaster("yarn")
>         .set("spark.serializer", KryoSerializer.class.getCanonicalName())
>         .registerKryoClasses(AnalyticsEventWrapper.class);
>     SparkSession spark = 
> SparkSession.builder().config(sparkConf).getOrCreate();
>         
>     MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
> AnalyticsEventWrapperRowMapper::map;
>     Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
>         .parquet("s3://bucket/path/to/events")
>         .map(mapAsAnalyticsEventWrapper, 
> Encoders.kryo(AnalyticsEventWrapper.class));
>         
>     // rest of the job (shuffle aggregation and output write)
>   }
> } {code}
> AnalyticsEventWrapperRowMapper.java
> {code:java}
> package com.analytics.archive;
> import org.apache.spark.sql.Row;
> import com.analytics.AnalyticsEventWrapper;
> public class AnalyticsEventWrapperRowMapper {
>     public static AnalyticsEventWrapper map(Row r) {
>         AnalyticsEventWrapper analyticsEventWrapper = new 
> AnalyticsEventWrapper();
>         analyticsEventWrapper.setId(r.getAs("id"));
>         analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
>         analyticsEventWrapper.setType(r.getAs("type"));
>         analyticsEventWrapper.setTopic(r.getAs("topic"));
>         return analyticsEventWrapper;
>     }
> } {code}
> AnalyticsEventWrapper.java
> {code:java}
> package com.analytics;
> public class AnalyticsEventWrapper {
>     private String id;
>     private Long timestamp;
>     private String type;
>     private String topic;
>     
>     public String getId() {
>         return id;
>     }
>     
>     public void setId(String id) {
>         this.id = id;
>     }
>     
>     public Long getTimestamp() {
>         return timestamp;
>     }
>     
>     public void setTimestamp(Long timestamp) {
>         this.timestamp = timestamp;
>     }
>     
>     public String getType() {
>         return type;
>     }
>     
>     public void setType(String type) {
>         this.type = type;
>     }
>     
>     public String getTopic() {
>         return topic;
>     }
>     
>     public void setTopic(String topic) {
>         this.topic = topic;
>     }
> } {code}
> the job is launched on EMR with spark-submit command (all application code is 
> packaged in application.jar with maven shade plugin):
> {code:java}
> // work
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
>   /tmp/application.jar \
>   --deploy-mode cluster \
>   --verbose \
>   --conf "spark.shuffle.push.enabled=false"
>  // kryo issue
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
>   /tmp/application.jar \
>   --deploy-mode cluster \
>   --verbose \
>   --conf "spark.shuffle.push.enabled=true"
> // yarn-site.xml of Node Managers
> <property>
>   <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
>   <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
> </property>  {code}
> h2. Update 
> When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The 
> error fades way and push-based shuffle can be enabled with 
> spark.shuffle.push.enabled=true.
> h2. Investigation
> Executor.scala calls env.blockmanager.initialize() that check 
> Utils.isPushBasedShuffleEnabled(), this will trigger an instanciation of Kryo 
> and the initialization of classes passed into registerKryoClasses() during 
> the getAutoReset() check.
> At this step, the user class loader has not yet be registered as it's 
> happening later during the Executor bootstrap process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to