[ 
https://issues.apache.org/jira/browse/SPARK-48043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romain Ardiet updated SPARK-48043:
----------------------------------
    Description: 
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle 
introduced in Spark 3.2 but it's failing with a kryo exception when I'm 
enabling it.

The issue is happening when Executor starts, during 
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting 
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
 ~[kryo-shaded-4.0.2.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_402]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_402]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
    at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
    at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.15.jar:?]
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    ... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
 {code}
My job code is the following:
{code:java}
package com.analytics.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.serializer.KryoSerializer; 
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;

public class ProcessAnalyticsEventJob {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf()
        .setMaster("yarn")
        .set("spark.serializer", KryoSerializer.class.getCanonicalName())
        .registerKryoClasses(AnalyticsEventWrapper.class);

    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
        
    MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
AnalyticsEventWrapperRowMapper::map;
    Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
        .parquet("s3://bucket/path/to/events")
        .map(mapAsAnalyticsEventWrapper, 
Encoders.kryo(AnalyticsEventWrapper.class));
        
    // rest of the job (shuffle aggregation and output write)
  }
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;

import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;

public class AnalyticsEventWrapperRowMapper {
    public static AnalyticsEventWrapper map(Row r) {
        AnalyticsEventWrapper analyticsEventWrapper = new 
AnalyticsEventWrapper();
        analyticsEventWrapper.setId(r.getAs("id"));
        analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
        analyticsEventWrapper.setType(r.getAs("type"));
        analyticsEventWrapper.setTopic(r.getAs("topic"));
        return analyticsEventWrapper;
    }
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;

public class AnalyticsEventWrapper {
    private String id;
    private Long timestamp;
    private String type;
    private String topic;
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public Long getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    
    public String getType() {
        return type;
    }
    
    public void setType(String type) {
        this.type = type;
    }
    
    public String getTopic() {
        return topic;
    }
    
    public void setTopic(String topic) {
        this.topic = topic;
    }
} {code}
the job is launched on EMR with spark-submit command (all application code is 
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose

// kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=true"

// yarn-site.xml of Node Managers
<property>
  <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
  <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property>  {code}
h2. Update 

When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The 
error fades way.

  was:
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle 
introduced in Spark 3.2 but it's failing with a kryo exception when I'm 
enabling it.

The issue is happening when Executor starts, during 
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting 
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
    at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
 ~[kryo-shaded-4.0.2.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_402]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_402]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
~[?:1.8.0_402]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
    at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
    at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
~[scala-library-2.12.15.jar:?]
    at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
~[scala-library-2.12.15.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
~[scala-library-2.12.15.jar:?]
    at 
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
 ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
    ... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
 {code}
My job code is the following:
{code:java}
package com.analytics.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;

public class ProcessAnalyticsEventJob {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf()
        .setMaster("yarn")
        .registerKryoClasses(AnalyticsEventWrapper.class);
        
    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
        
    MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
AnalyticsEventWrapperRowMapper::map;
    Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
        .parquet("s3://bucket/path/to/events")
        .map(mapAsAnalyticsEventWrapper, 
Encoders.kryo(AnalyticsEventWrapper.class));
        
    // rest of the job (shuffle aggregation and output write)
  }
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;

import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;

public class AnalyticsEventWrapperRowMapper {
    public static AnalyticsEventWrapper map(Row r) {
        AnalyticsEventWrapper analyticsEventWrapper = new 
AnalyticsEventWrapper();
        analyticsEventWrapper.setId(r.getAs("id"));
        analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
        analyticsEventWrapper.setType(r.getAs("type"));
        analyticsEventWrapper.setTopic(r.getAs("topic"));
        return analyticsEventWrapper;
    }
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;

public class AnalyticsEventWrapper {
    private String id;
    private Long timestamp;
    private String type;
    private String topic;
    
    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }
    
    public Long getTimestamp() {
        return timestamp;
    }
    
    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
    
    public String getType() {
        return type;
    }
    
    public void setType(String type) {
        this.type = type;
    }
    
    public String getTopic() {
        return topic;
    }
    
    public void setTopic(String topic) {
        this.topic = topic;
    }
} {code}
the job is launched on EMR with spark-submit command (all application code is 
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose

// kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
  /tmp/application.jar \
  --deploy-mode cluster \
  --verbose \
  --conf "spark.shuffle.push.enabled=true"

// yarn-site.xml of Node Managers
<property>
  <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
  <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property>  {code}


> Kryo registerKryoClasses() issue with push-based shuffle
> --------------------------------------------------------
>
>                 Key: SPARK-48043
>                 URL: https://issues.apache.org/jira/browse/SPARK-48043
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.4.1
>         Environment: AWS EMR 6.14 (Spark 3.4.1)
>            Reporter: Romain Ardiet
>            Priority: Major
>
> I'm running a spark job on AWS EMR. I wanted to test the new push-based 
> shuffle introduced in Spark 3.2 but it's failing with a kryo exception when 
> I'm enabling it.
> The issue is happening when Executor starts, during 
> KryoSerializerInstance.getAutoReset() check:
> {code:java}
> 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor 
> self-exiting due to : Unable to create executor due to Failed to register 
> classes with Kryo
> org.apache.spark.SparkException: Failed to register classes with Kryo
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
> ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
>  ~[kryo-shaded-4.0.2.jar:?]
>     at 
> org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.executor.Executor.<init>(Executor.scala:143) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_402]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_402]
>     at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
> Caused by: java.lang.ClassNotFoundException: 
> com.analytics.AnalyticsEventWrapper
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:387) 
> ~[?:1.8.0_402]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) 
> ~[?:1.8.0_402]
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
>     at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
>     at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
>     at org.apache.spark.util.Utils$.classForName(Utils.scala:228) 
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> ~[scala-library-2.12.15.jar:?]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> ~[scala-library-2.12.15.jar:?]
>     at 
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
>  ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
>     ... 24 more
> 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a 
> shutdown
> 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
>  {code}
> My job code is the following:
> {code:java}
> package com.analytics.spark;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.serializer.KryoSerializer; 
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import com.analytics.archive.AnalyticsEventWrapperRowMapper;
> import com.analytics.AnalyticsEventWrapper;
> public class ProcessAnalyticsEventJob {
>   public static void main(String[] args) throws Exception {
>     SparkConf sparkConf = new SparkConf()
>         .setMaster("yarn")
>         .set("spark.serializer", KryoSerializer.class.getCanonicalName())
>         .registerKryoClasses(AnalyticsEventWrapper.class);
>     SparkSession spark = 
> SparkSession.builder().config(sparkConf).getOrCreate();
>         
>     MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper = 
> AnalyticsEventWrapperRowMapper::map;
>     Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
>         .parquet("s3://bucket/path/to/events")
>         .map(mapAsAnalyticsEventWrapper, 
> Encoders.kryo(AnalyticsEventWrapper.class));
>         
>     // rest of the job (shuffle aggregation and output write)
>   }
> } {code}
> AnalyticsEventWrapperRowMapper.java
> {code:java}
> package com.analytics.archive;
> import org.apache.spark.sql.Row;
> import com.analytics.AnalyticsEventWrapper;
> public class AnalyticsEventWrapperRowMapper {
>     public static AnalyticsEventWrapper map(Row r) {
>         AnalyticsEventWrapper analyticsEventWrapper = new 
> AnalyticsEventWrapper();
>         analyticsEventWrapper.setId(r.getAs("id"));
>         analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
>         analyticsEventWrapper.setType(r.getAs("type"));
>         analyticsEventWrapper.setTopic(r.getAs("topic"));
>         return analyticsEventWrapper;
>     }
> } {code}
> AnalyticsEventWrapper.java
> {code:java}
> package com.analytics;
> public class AnalyticsEventWrapper {
>     private String id;
>     private Long timestamp;
>     private String type;
>     private String topic;
>     
>     public String getId() {
>         return id;
>     }
>     
>     public void setId(String id) {
>         this.id = id;
>     }
>     
>     public Long getTimestamp() {
>         return timestamp;
>     }
>     
>     public void setTimestamp(Long timestamp) {
>         this.timestamp = timestamp;
>     }
>     
>     public String getType() {
>         return type;
>     }
>     
>     public void setType(String type) {
>         this.type = type;
>     }
>     
>     public String getTopic() {
>         return topic;
>     }
>     
>     public void setTopic(String topic) {
>         this.topic = topic;
>     }
> } {code}
> the job is launched on EMR with spark-submit command (all application code is 
> packaged in application.jar with maven shade plugin):
> {code:java}
> // work
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
>   /tmp/application.jar \
>   --deploy-mode cluster \
>   --verbose
> // kryo issue
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
>   /tmp/application.jar \
>   --deploy-mode cluster \
>   --verbose \
>   --conf "spark.shuffle.push.enabled=true"
> // yarn-site.xml of Node Managers
> <property>
>   <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
>   <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
> </property>  {code}
> h2. Update 
> When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The 
> error fades way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to