[ https://issues.apache.org/jira/browse/SPARK-32414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
cyrille cazenave updated SPARK-32414: ------------------------------------- Attachment: fulllogs.txt > pyspark crashes in cluster mode with kafka structured streaming > --------------------------------------------------------------- > > Key: SPARK-32414 > URL: https://issues.apache.org/jira/browse/SPARK-32414 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.0.0 > Environment: * spark version 3.0.0 from mac brew > * kubernetes Kind 18+ > * kafka cluster: strimzi/kafka:0.18.0-kafka-2.5.0 > * kafka package: org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 > Reporter: cyrille cazenave > Priority: Major > Attachments: fulllogs.txt > > > Hello, > {{I have been trying to run a pyspark script on Spark on Kubernetes and I > have this error that crashed the application:}} > {{java.lang.invoke.SerializedLambda to field > org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance > of org.apache.spark.rdd.MapPartitionsRDD)}} > > I followed those steps: > * for spark on kubernetes: > [https://spark.apache.org/docs/latest/running-on-kubernetes.html] (that > include building the image using docker-image-tool.sh on mac with -p flag) > * Tried to use the image by the dev on > GoogleCloudPlatform/spark-on-k8s-operator > (gcr.io/spark-operator/spark-py:v3.0.0) and have the same issue > * for kafka streaming: > [https://spark.apache.org/docs/3.0.0/structured-streaming-kafka-integration.html#deploying] > * {{When running the script manually in a jupyter notebook > (jupyter/pyspark-notebook:latest, version 3.0.0) in local mode (with > PYSPARK_SUBMIT_ARGS=--packages > org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell) it ran > without issue}} > * the command ran from the laptop is: > spark-submit --master k8s://https://127.0.0.1:53979 --name spark-pi > --deploy-mode cluster --packages > org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --conf > spark.kubernetes.container.image=fifoosab/pytest:3.0.0.dev0 --conf > spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf > spark.kubernetes.executor.request.cores=1 --conf > spark.kubernetes.driver.request.cores=1 --conf > spark.kubernetes.container.image.pullPolicy=Always local:///usr/bin/spark.py > > {{more logs on the error:}} > \{{}} > {{20/07/23 14:26:08 INFO TaskSetManager: Lost task 1.3 in stage 1.0 (TID 11) > on 10.244.3.7, executor 1: java.lang.ClassCastException (cannot assign > instance of java.lang.invoke.SerializedLambda to field > org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance > of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 11]}} > {{20/07/23 14:26:08 ERROR TaskSetManager: Task 1 in stage 1.0 failed 4 > times; aborting job}} > {{20/07/23 14:26:08 INFO TaskSchedulerImpl: Cancelling stage 1}} > {{20/07/23 14:26:08 INFO TaskSchedulerImpl: Killing all running tasks in > stage 1: Stage cancelled}} > {{20/07/23 14:26:08 INFO TaskSchedulerImpl: Stage 1 was cancelled}} > {{20/07/23 14:26:08 INFO TaskSetManager: Lost task 3.3 in stage 1.0 (TID 13) > on 10.244.3.7, executor 1: java.lang.ClassCastException (cannot assign > instance of java.lang.invoke.SerializedLambda to field > org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance > of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 12]}} > {{20/07/23 14:26:08 INFO DAGScheduler: ResultStage 1 (start at > NativeMethodAccessorImpl.java:0) failed in 20.352 s due to Job aborted due to > stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost > task 1.3 in stage 1.0 (TID 11, 10.244.3.7, executor 1): > java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance > of org.apache.spark.rdd.MapPartitionsRDD}} > \{{ at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)}} > \{{ at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2350)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)}} > \{{ at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)}} > \{{ at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)}} > \{{ at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} > \{{ at java.lang.reflect.Method.invoke(Method.java:498)}} > \{{ at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)}} > \{{ at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)}} > \{{ at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)}} > \{{ at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} > \{{ at java.lang.reflect.Method.invoke(Method.java:498)}} > \{{ at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)}} > \{{ at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)}} > \{{ at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)}} > \{{ at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} > \{{ at java.lang.reflect.Method.invoke(Method.java:498)}} > \{{ at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)}} > \{{ at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)}} > \{{ at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)}} > \{{ at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} > \{{ at java.lang.reflect.Method.invoke(Method.java:498)}} > \{{ at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2235)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)}} > \{{ at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)}} > \{{ at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)}} > \{{ at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)}} > \{{ at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)}} > \{{ at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)}} > \{{ at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)}} > \{{ at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)}} > \{{ at org.apache.spark.scheduler.Task.run(Task.scala:127)}} > \{{ at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)}} > \{{ at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)}} > \{{ at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)}} > \{{ at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}} > \{{ at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}} > \{{ at java.lang.Thread.run(Thread.java:748)}}{{Driver stacktrace:}} > {{20/07/23 14:26:08 INFO DAGScheduler: Job 0 failed: start at > NativeMethodAccessorImpl.java:0, took 24.554962 s}} > {{20/07/23 14:26:08 ERROR WriteToDataSourceV2Exec: Data source write support > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3f66880a is > aborting.}} > {{20/07/23 14:26:08 ERROR WriteToDataSourceV2Exec: Data source write support > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3f66880a > aborted.}} > {{20/07/23 14:26:08 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 12) > on 10.244.1.7, executor 2: java.lang.ClassCastException (cannot assign > instance of java.lang.invoke.SerializedLambda to field > org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance > of org.apache.spark.rdd.MapPartitionsRDD) [duplicate 13]}} > {{20/07/23 14:26:08 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks > have all completed, from pool}} > {{20/07/23 14:26:08 ERROR MicroBatchExecution: Query [id = > 6eaa5395-81c6-4892-b5c9-d706189a1121, runId = > 4a172f34-cac1-407d-aff7-a58b4b2c1106] terminated with error}} > {{org.apache.spark.SparkException: Writing job aborted.}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)}} > \{{ at > org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)}} > \{{ at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)}} > \{{ at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)}} > \{{ at > org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)}} > \{{ at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)}} > \{{ at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)}} > \{{ at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:576)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)}} > \{{ at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)}} > \{{ at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)}} > \{{ at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)}} > \{{ at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)}} > \{{ at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)}} > \{{ at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)}} > \{{ at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)}} > \{{ at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)}} > \{{ at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)}} > \{{ at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)}} > \{{ at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)}} > \{{ at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)}} > { > { at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)} > } -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org