[ 
https://issues.apache.org/jira/browse/SPARK-33349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628700#comment-17628700
 ] 

Yilun Fan commented on SPARK-33349:
-----------------------------------

I also met this problem in Spark 3.2.1, kubernetes-client 5.4.1.

 
{code:java}
ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is 
expected if the application is shutting down.) 
io.fabric8.kubernetes.client.WatcherException: too old resource version: 
63993943 (64057995) 
at 
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$TypedWatcherWebSocketListener.onMessage(WatchConnectionManager.java:103){code}
I think we have to add some retry in ExecutorPodsWatchSnapshotSource. 
Especially when we close spark.kubernetes.executor.enableApiPolling,  only this 
watcher can receive executor pod status.

Just like what spark has done in the submit client.  
[https://github.com/apache/spark/pull/29533/files] 

 

> ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed
> ------------------------------------------------------------------
>
>                 Key: SPARK-33349
>                 URL: https://issues.apache.org/jira/browse/SPARK-33349
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes
>    Affects Versions: 3.0.1, 3.0.2, 3.1.0
>            Reporter: Nicola Bova
>            Priority: Critical
>
> I launch my spark application with the 
> [spark-on-kubernetes-operator|https://github.com/GoogleCloudPlatform/spark-on-k8s-operator]
>  with the following yaml file:
> {code:yaml}
> apiVersion: sparkoperator.k8s.io/v1beta2
> kind: SparkApplication
> metadata:
>    name: spark-kafka-streamer-test
>    namespace: kafka2hdfs
> spec: 
>    type: Scala
>    mode: cluster
>    image: <my-repo>/spark:3.0.2-SNAPSHOT-2.12-0.1.0
>    imagePullPolicy: Always
>    timeToLiveSeconds: 259200
>    mainClass: path.to.my.class.KafkaStreamer
>    mainApplicationFile: spark-kafka-streamer_2.12-spark300-assembly.jar
>    sparkVersion: 3.0.1
>    restartPolicy:
>      type: Always
>    sparkConf:
>      "spark.kafka.consumer.cache.capacity": "8192"
>      "spark.kubernetes.memoryOverheadFactor": "0.3"
>    deps:
>    jars:
>      - my
>      - jar
>      - list
>    hadoopConfigMap: hdfs-config
>    driver:
>      cores: 4
>      memory: 12g
>      labels:
>        version: 3.0.1
>      serviceAccount: default
>      javaOptions: 
> "-Dlog4j.configuration=file:///opt/spark/log4j/log4j.properties"
>   executor:
>      instances: 4
>     cores: 4
>     memory: 16g
>     labels:
>       version: 3.0.1
>     javaOptions: 
> "-Dlog4j.configuration=file:///opt/spark/log4j/log4j.properties"
> {code}
>  I have tried with both Spark `3.0.1` and `3.0.2-SNAPSHOT` with the ["Restart 
> the watcher when we receive a version changed from 
> k8s"|https://github.com/apache/spark/pull/29533] patch.
> This is the driver log:
> {code}
> 20/11/04 12:16:02 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> ... // my app log, it's a structured streaming app reading from kafka and 
> writing to hdfs
> 20/11/04 13:12:12 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has 
> been closed (this is expected if the application is shutting down.)
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource 
> version: 1574101276 (1574213896)
>  at 
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
>  at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
>  at 
> okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
>  at 
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
>  at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
>  at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
>  at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
>  at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
>  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>  at java.base/java.lang.Thread.run(Unknown Source)
> {code}
> The error above appears after roughly 50 minutes.
> After the exception above, no more logs are produced and the app hangs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to