[
https://issues.apache.org/jira/browse/SPARK-48043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Romain Ardiet updated SPARK-48043:
----------------------------------
Description:
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle
introduced in Spark 3.2 but it's failing with a kryo exception when I'm
enabling it.
The issue is happening when Executor starts, during
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
~[kryo-shaded-4.0.2.jar:?]
at
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.executor.Executor.<init>(Executor.scala:143)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_402]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_402]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
~[scala-library-2.12.15.jar:?]
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
~[scala-library-2.12.15.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
~[scala-library-2.12.15.jar:?]
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
{code}
My job code is the following:
{code:java}
package com.analytics.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;
public class ProcessAnalyticsEventJob {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf()
.setMaster("yarn")
.set("spark.serializer", KryoSerializer.class.getCanonicalName())
.registerKryoClasses(AnalyticsEventWrapper.class);
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper =
AnalyticsEventWrapperRowMapper::map;
Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
.parquet("s3://bucket/path/to/events")
.map(mapAsAnalyticsEventWrapper,
Encoders.kryo(AnalyticsEventWrapper.class));
// rest of the job (shuffle aggregation and output write)
}
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;
import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;
public class AnalyticsEventWrapperRowMapper {
public static AnalyticsEventWrapper map(Row r) {
AnalyticsEventWrapper analyticsEventWrapper = new
AnalyticsEventWrapper();
analyticsEventWrapper.setId(r.getAs("id"));
analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
analyticsEventWrapper.setType(r.getAs("type"));
analyticsEventWrapper.setTopic(r.getAs("topic"));
return analyticsEventWrapper;
}
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;
public class AnalyticsEventWrapper {
private String id;
private Long timestamp;
private String type;
private String topic;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
} {code}
the job is launched on EMR with spark-submit command (all application code is
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
/tmp/application.jar \
--deploy-mode cluster \
--verbose
// kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
/tmp/application.jar \
--deploy-mode cluster \
--verbose \
--conf "spark.shuffle.push.enabled=true"
// yarn-site.xml of Node Managers
<property>
<name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
<value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property> {code}
h2. Update
When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The
error fades way.
was:
I'm running a spark job on AWS EMR. I wanted to test the new push-based shuffle
introduced in Spark 3.2 but it's failing with a kryo exception when I'm
enabling it.
The issue is happening when Executor starts, during
KryoSerializerInstance.getAutoReset() check:
{code:java}
24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting
due to : Unable to create executor due to Failed to register classes with Kryo
org.apache.spark.SparkException: Failed to register classes with Kryo
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
~[kryo-shaded-4.0.2.jar:?]
at
org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.executor.Executor.<init>(Executor.scala:143)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_402]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_402]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
Caused by: java.lang.ClassNotFoundException: com.analytics.AnalyticsEventWrapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
~[?:1.8.0_402]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
~[scala-library-2.12.15.jar:?]
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
~[scala-library-2.12.15.jar:?]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
~[scala-library-2.12.15.jar:?]
at
org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
... 24 more
24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a
shutdown
24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
{code}
My job code is the following:
{code:java}
package com.analytics.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.analytics.archive.AnalyticsEventWrapperRowMapper;
import com.analytics.AnalyticsEventWrapper;
public class ProcessAnalyticsEventJob {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf()
.setMaster("yarn")
.registerKryoClasses(AnalyticsEventWrapper.class);
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper =
AnalyticsEventWrapperRowMapper::map;
Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
.parquet("s3://bucket/path/to/events")
.map(mapAsAnalyticsEventWrapper,
Encoders.kryo(AnalyticsEventWrapper.class));
// rest of the job (shuffle aggregation and output write)
}
} {code}
AnalyticsEventWrapperRowMapper.java
{code:java}
package com.analytics.archive;
import org.apache.spark.sql.Row;
import com.analytics.AnalyticsEventWrapper;
public class AnalyticsEventWrapperRowMapper {
public static AnalyticsEventWrapper map(Row r) {
AnalyticsEventWrapper analyticsEventWrapper = new
AnalyticsEventWrapper();
analyticsEventWrapper.setId(r.getAs("id"));
analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
analyticsEventWrapper.setType(r.getAs("type"));
analyticsEventWrapper.setTopic(r.getAs("topic"));
return analyticsEventWrapper;
}
} {code}
AnalyticsEventWrapper.java
{code:java}
package com.analytics;
public class AnalyticsEventWrapper {
private String id;
private Long timestamp;
private String type;
private String topic;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
} {code}
the job is launched on EMR with spark-submit command (all application code is
packaged in application.jar with maven shade plugin):
{code:java}
// work
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
/tmp/application.jar \
--deploy-mode cluster \
--verbose
// kryo issue
spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
/tmp/application.jar \
--deploy-mode cluster \
--verbose \
--conf "spark.shuffle.push.enabled=true"
// yarn-site.xml of Node Managers
<property>
<name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
<value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
</property> {code}
> Kryo registerKryoClasses() issue with push-based shuffle
> --------------------------------------------------------
>
> Key: SPARK-48043
> URL: https://issues.apache.org/jira/browse/SPARK-48043
> Project: Spark
> Issue Type: Bug
> Components: Shuffle
> Affects Versions: 3.4.1
> Environment: AWS EMR 6.14 (Spark 3.4.1)
> Reporter: Romain Ardiet
> Priority: Major
>
> I'm running a spark job on AWS EMR. I wanted to test the new push-based
> shuffle introduced in Spark 3.2 but it's failing with a kryo exception when
> I'm enabling it.
> The issue is happening when Executor starts, during
> KryoSerializerInstance.getAutoReset() check:
> {code:java}
> 24/04/24 15:36:22 ERROR YarnCoarseGrainedExecutorBackend: Executor
> self-exiting due to : Unable to create executor due to Failed to register
> classes with Kryo
> org.apache.spark.SparkException: Failed to register classes with Kryo
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:186)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> ~[scala-library-2.12.15.jar:?]
> at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:174)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer$$anon$1.create(KryoSerializer.scala:105)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.borrow(KryoPoolQueueImpl.java:48)
> ~[kryo-shaded-4.0.2.jar:?]
> at
> org.apache.spark.serializer.KryoSerializer$PoolWrapper.borrow(KryoSerializer.scala:112)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:352)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializerInstance.getAutoReset(KryoSerializer.scala:452)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects$lzycompute(KryoSerializer.scala:259)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.supportsRelocationOfSerializedObjects(KryoSerializer.scala:255)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.util.Utils$.serializerIsSupported$lzycompute$1(Utils.scala:2721)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.util.Utils$.serializerIsSupported$1(Utils.scala:2716)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.util.Utils$.isPushBasedShuffleEnabled(Utils.scala:2730)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:554)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.executor.Executor.<init>(Executor.scala:143)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:190)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_402]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_402]
> at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
> Caused by: java.lang.ClassNotFoundException:
> com.analytics.AnalyticsEventWrapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
> ~[?:1.8.0_402]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_402]
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
> ~[?:1.8.0_402]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_402]
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_402]
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_402]
> at org.apache.spark.util.Utils$.classForName(Utils.scala:228)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$6(KryoSerializer.scala:177)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> ~[scala-library-2.12.15.jar:?]
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.serializer.KryoSerializer.$anonfun$newKryo$5(KryoSerializer.scala:176)
> ~[spark-core_2.12-3.4.1-amzn-1.jar:3.4.1-amzn-1]
> ... 24 more
> 24/04/24 15:36:22 INFO YarnCoarseGrainedExecutorBackend: Driver commanded a
> shutdown
> 24/04/24 15:36:22 ERROR Utils: Uncaught exception in thread shutdown-hook-0
> {code}
> My job code is the following:
> {code:java}
> package com.analytics.spark;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.MapFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Encoders;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> import com.analytics.archive.AnalyticsEventWrapperRowMapper;
> import com.analytics.AnalyticsEventWrapper;
> public class ProcessAnalyticsEventJob {
> public static void main(String[] args) throws Exception {
> SparkConf sparkConf = new SparkConf()
> .setMaster("yarn")
> .set("spark.serializer", KryoSerializer.class.getCanonicalName())
> .registerKryoClasses(AnalyticsEventWrapper.class);
> SparkSession spark =
> SparkSession.builder().config(sparkConf).getOrCreate();
>
> MapFunction<Row, AnalyticsEventWrapper> mapAsAnalyticsEventWrapper =
> AnalyticsEventWrapperRowMapper::map;
> Dataset<AnalyticsEventWrapper> inputDataset = spark.read()
> .parquet("s3://bucket/path/to/events")
> .map(mapAsAnalyticsEventWrapper,
> Encoders.kryo(AnalyticsEventWrapper.class));
>
> // rest of the job (shuffle aggregation and output write)
> }
> } {code}
> AnalyticsEventWrapperRowMapper.java
> {code:java}
> package com.analytics.archive;
> import org.apache.spark.sql.Row;
> import com.analytics.AnalyticsEventWrapper;
> public class AnalyticsEventWrapperRowMapper {
> public static AnalyticsEventWrapper map(Row r) {
> AnalyticsEventWrapper analyticsEventWrapper = new
> AnalyticsEventWrapper();
> analyticsEventWrapper.setId(r.getAs("id"));
> analyticsEventWrapper.setTimestamp(r.getAs("timestamp"));
> analyticsEventWrapper.setType(r.getAs("type"));
> analyticsEventWrapper.setTopic(r.getAs("topic"));
> return analyticsEventWrapper;
> }
> } {code}
> AnalyticsEventWrapper.java
> {code:java}
> package com.analytics;
> public class AnalyticsEventWrapper {
> private String id;
> private Long timestamp;
> private String type;
> private String topic;
>
> public String getId() {
> return id;
> }
>
> public void setId(String id) {
> this.id = id;
> }
>
> public Long getTimestamp() {
> return timestamp;
> }
>
> public void setTimestamp(Long timestamp) {
> this.timestamp = timestamp;
> }
>
> public String getType() {
> return type;
> }
>
> public void setType(String type) {
> this.type = type;
> }
>
> public String getTopic() {
> return topic;
> }
>
> public void setTopic(String topic) {
> this.topic = topic;
> }
> } {code}
> the job is launched on EMR with spark-submit command (all application code is
> packaged in application.jar with maven shade plugin):
> {code:java}
> // work
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
> /tmp/application.jar \
> --deploy-mode cluster \
> --verbose
> // kryo issue
> spark-submit --class com.analytics.spark.ProcessAnalyticsEventJob \
> /tmp/application.jar \
> --deploy-mode cluster \
> --verbose \
> --conf "spark.shuffle.push.enabled=true"
> // yarn-site.xml of Node Managers
> <property>
> <name>spark.shuffle.push.server.mergedShuffleFileManagerImpl</name>
> <value>org.apache.spark.network.shuffle.RemoteBlockPushResolver</value>
> </property> {code}
> h2. Update
> When removing {*}.registerKryoClasses(AnalyticsEventWrapper.class){*}. The
> error fades way.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]