Abhilash created SPARK-45860: -------------------------------- Summary: ClassCastException with SerializedLambda in Spark Cluster Mode Key: SPARK-45860 URL: https://issues.apache.org/jira/browse/SPARK-45860 Project: Spark Issue Type: Bug Components: Spark Core, Spark Submit Affects Versions: 3.4.1, 3.2.1 Environment: *Environment* Java Version: 11 Spring Boot Version: 2.7.10 Spark Version: 3.2.1 Reporter: Abhilash
h3. Issue Description Running a Spark application in cluster mode encounters a `{*}java.lang.ClassCastException{*}` related to `j{*}ava.lang.invoke.SerializedLambda{*}`. This issue seems to be specific to the Spark Cluster mode, and it doesn't occur when running the application locally without Spring Boot. h3. Steps to Reproduce # Create a dummy dataset {code:java} Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", "Rick", "Duc"), Encoders.STRING()); {code} # Call flatMap function to transform the data {code:java} Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), Encoders.bean(TestData.class)); {code} # Call any action on the transformed dataset {code:java} transformedData.show(); {code} # Running this Spark application with spark submit command in cluster mode with Spring Boot results in the mentioned ClassCastException. h3. *Complete Code:* {code:java} @SpringBootApplication(exclude = {org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class}) public class SampleSparkJob{ public static void main(String[] args) { SpringApplication.run(DataIngestionServiceApplication.class, args); SparkSession spark = SparkSession.builder() .appName("SampleSparkJob") .master("local[*]") .getOrCreate(); Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", "Rick", "Duc"), Encoders.STRING()); Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), Encoders.bean(TestData.class)); transformedData.show(); transformedData.write().mode("append").parquet("outputpath"); spark.stop(); } }{code} {code:java} class TestDataFlatMap implements FlatMapFunction<String, TestData>, Serializable { @Override public Iterator<TestData> call(String name) { return Arrays.asList(new TestData(name)).iterator(); } }{code} {code:java} @Data @AllArgsConstructor public class TestData implements Serializable { private String name; } {code} h3. Stack trace: {code:java} WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.248.66.38 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076) at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039) at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293) at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {code} *Environment* Java Version: 11 Spring Boot Version: 2.7.10 Spark Version: 3.2.1 h3. Additional Information: The issue seems to be related to Spring Boot auto-configuration or the dependencies included with Spring Boot. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org