Abhilash created SPARK-45860:
--------------------------------

             Summary: ClassCastException with SerializedLambda in Spark Cluster 
Mode
                 Key: SPARK-45860
                 URL: https://issues.apache.org/jira/browse/SPARK-45860
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Spark Submit
    Affects Versions: 3.4.1, 3.2.1
         Environment: *Environment*
Java Version: 11
Spring Boot Version: 2.7.10
Spark Version: 3.2.1
            Reporter: Abhilash


h3. Issue Description

Running a Spark application in cluster mode encounters a 
`{*}java.lang.ClassCastException{*}` related to 
`j{*}ava.lang.invoke.SerializedLambda{*}`. This issue seems to be specific to 
the Spark Cluster mode, and it doesn't occur when running the application 
locally without Spring Boot.

 
h3. Steps to Reproduce
 # Create a dummy dataset
{code:java}
Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", "Andrii", 
"Rick", "Duc"), Encoders.STRING()); {code}

 # Call flatMap function to transform the data
{code:java}
Dataset<TestData> transformedData = dummyData.flatMap(new TestDataFlatMap(), 
Encoders.bean(TestData.class)); {code}

 # Call any action on the transformed dataset
{code:java}
transformedData.show(); {code}

 # Running this Spark application with spark submit command in cluster mode 
with Spring Boot results in the mentioned ClassCastException.

 
h3. *Complete Code:*

 
{code:java}
@SpringBootApplication(exclude = 
{org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration.class})
public class SampleSparkJob{
    public static void main(String[] args) {
        SpringApplication.run(DataIngestionServiceApplication.class, args);

        SparkSession spark = SparkSession.builder()
                .appName("SampleSparkJob")
                .master("local[*]")
                .getOrCreate();
        Dataset<String> dummyData = spark.createDataset(Arrays.asList("Abhi", 
"Andrii", "Rick", "Duc"), Encoders.STRING());
        Dataset<TestData> transformedData = dummyData.flatMap(new 
TestDataFlatMap(), Encoders.bean(TestData.class));
        transformedData.show();
        transformedData.write().mode("append").parquet("outputpath");
        spark.stop();
    }
}{code}
{code:java}
class TestDataFlatMap implements FlatMapFunction<String, TestData>, 
Serializable {
    @Override
    public Iterator<TestData> call(String name) {
        return Arrays.asList(new TestData(name)).iterator();
    }
}{code}
{code:java}
@Data
@AllArgsConstructor
public class TestData implements Serializable {
    private String name;
} {code}
 
h3. 
Stack trace:
{code:java}
WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.248.66.38 executor 
0): java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of 
org.apache.spark.rdd.MapPartitionsRDD      at 
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
    at 
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
   at 
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
     at 
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
     at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)   at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)   at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)   
     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)   at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)   at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)   
     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)   at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)   at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)   
     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)       at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.base/java.lang.reflect.Method.invoke(Method.java:566)   at 
java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
    at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
   at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) 
     at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
  at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)   at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)   at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)   at 
org.apache.spark.scheduler.Task.run(Task.scala:131)  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)    at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

*Environment*
Java Version: 11
Spring Boot Version: 2.7.10
Spark Version: 3.2.1
h3. Additional Information:

The issue seems to be related to Spring Boot auto-configuration or the 
dependencies included with Spring Boot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to