[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1038973719 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) + } Review Comment: I see the problem, and actually I don't know how to handle it properly ... I use `sys.exit` here just want to kill the process or driver pod, so that `ShutdownHookManager` will be invoked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1152967630 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -750,6 +750,26 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") + .doc("Spark exit if the number of failed executors exceeds this threshold. This " + +s"configuration only take effect when ${KUBERNETES_ALLOCATION_PODS_ALLOCATOR.key} " + +s"is set to 'direct'.") + .version("3.5.0") + .intConf + .checkValue(value => value > 0, "Max number of failures must be a positive value") + .createOptional + + val KUBERNETES_EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = +ConfigBuilder("spark.kubernetes.executor.failuresValidityInterval") Review Comment: Currently, only YARN supports this feature, is it good to define a `spark.executor.failuresValidityInterval` and clarify the limitation on the conf description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1152964045 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -148,6 +163,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) Review Comment: we can override the function for testing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141438598 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) + } Review Comment: @holdenk sorry for so long late reply. I think the `sys.exit` should be fine here, because 1) According to docs of `Runtime#addShutdownHook`, `sys.exit` will trigger `ShutdownHookManager`, > Registers a new virtual-machine shutdown hook. The Java virtual machine shuts down in response to two kinds of events: The program exits normally, when the last non-daemon thread exits or when the exit (equivalently, System.exit) method is invoked, or The virtual machine is terminated in response to a user interrupt, such as typing ^C, or a system-wide event, such as user logoff or system shutdown. 2) I read the code of YARN `ApplicationMaster`, the extra logic consists of - clean staging dir - resubmit if `exitCode != 0` and max attempts is unreached, or call `YarnRMClient.unregister` to unregister application. IMO we don't need to do 2) in K8s, simply call `sys.exit(exitCode)` to terminate Pod and propagate the exit code to Pod state should be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141431687 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -253,6 +272,21 @@ class ExecutorPodsAllocator( case _ => false } + val currentFailedExecutorIds = podsForRpId.filter { +case (_, PodFailed(pod)) => + pod.getSpec Review Comment: removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141438872 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -746,6 +746,25 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") + .doc("Spark exit if the number of failed executors exceeds this threshold. This " + +s"configuration only take effect when ${KUBERNETES_ALLOCATION_PODS_ALLOCATOR.key} " + +s"is set to 'direct'.") + .version("3.4.0") + .intConf + .createOptional + + val KUBERNETES_EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = Review Comment: there were such long name variables in existing configurations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141438598 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) + } Review Comment: @holdenk sorry for so long late reply. I think the `sys.exit` should be fine here, because 1) According to docs of `Runtime#addShutdownHook`, `sys.exit` will trigger `ShutdownHookManager`, > Registers a new virtual-machine shutdown hook. The Java virtual machine shuts down in response to two kinds of events: The program exits normally, when the last non-daemon thread exits or when the exit (equivalently, System.exit) method is invoked, or The virtual machine is terminated in response to a user interrupt, such as typing ^C, or a system-wide event, such as user logoff or system shutdown. 2) I read the code of YARN `ApplicationMaster`, the extra logic consists of - clean staging dir - resubmit if `exitCode != 0` and max attempts is unreached, or call `YarnRMClient.unregister` to unregister application. IMO we don't need to do 2) in K8s, simply call `sys.exit(exitCode)` to propagate the exit code to Pod state should be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141433663 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -746,6 +746,25 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") + .doc("Spark exit if the number of failed executors exceeds this threshold. This " + +s"configuration only take effect when ${KUBERNETES_ALLOCATION_PODS_ALLOCATOR.key} " + +s"is set to 'direct'.") + .version("3.4.0") + .intConf Review Comment: added `checkValue` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1141431687 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -253,6 +272,21 @@ class ExecutorPodsAllocator( case _ => false } + val currentFailedExecutorIds = podsForRpId.filter { +case (_, PodFailed(pod)) => + pod.getSpec Review Comment: simply return a property value, no side-effect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1043114690 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -723,6 +723,25 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") Review Comment: I thought that, but it does not match the current style of K8s configurations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1042382362 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) + } Review Comment: Emm, I see a similar usage https://github.com/apache/spark/blob/1d2159f888139e65c80db2003d521b0f684df83a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala#L145-L147 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1038973719 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +sys.exit(exitCode) + } Review Comment: I see the problem, and actually I don't know how to handle it properly ... I use `sys.exit` here just want to kill the process or driver pod, so that `ShutdownHookManager` will be invoked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1028084187 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -494,10 +525,46 @@ class ExecutorPodsAllocator( private[spark] object ExecutorPodsAllocator { + private[spark] val EXIT_MAX_EXECUTOR_FAILURES = 11 Review Comment: borrowed from YARN module `ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1028083213 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -136,6 +151,10 @@ class ExecutorPodsAllocator( def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private[k8s] def stopApplication(exitCode: Int): Unit = { +SparkContext.getActive.foreach(_.stop(exitCode)) Review Comment: It's not elegant, but `AbstractPodsAllocator` is a developer api, introduce parameter sc will change the signature of constructor, suggestions are welcome here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1027559033 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -119,6 +126,12 @@ class ExecutorPodsAllocator( } } snapshotsStore.addSubscriber(podAllocationDelay) { + maxNumExecutorFailuresOpt.foreach { maxNumExecutorFailures => +if (failureTracker.numFailedExecutors > maxNumExecutorFailures) { + logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") + SparkContext.getActive.foreach(_.stop(EXIT_MAX_EXECUTOR_FAILURES)) Review Comment: It's ugly, but `AbstractPodsAllocator` is a developer api, pass `sc` will change the constructor signature. Suggestions are welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1027559033 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -119,6 +126,12 @@ class ExecutorPodsAllocator( } } snapshotsStore.addSubscriber(podAllocationDelay) { + maxNumExecutorFailuresOpt.foreach { maxNumExecutorFailures => +if (failureTracker.numFailedExecutors > maxNumExecutorFailures) { + logError(s"Max number of executor failures ($maxNumExecutorFailures) reached") + SparkContext.getActive.foreach(_.stop(EXIT_MAX_EXECUTOR_FAILURES)) Review Comment: It's ugly, but `AbstractPodsAllocator` is a developer api, pass `sc` will change the constructor signature. Suggestions are welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1027519725 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -723,6 +723,25 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") Review Comment: The equivalent yarn configuration is `spark.yarn.max.executor.failures`, naming suggestion is welcome -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #38732: [SPARK-41210][K8S] Window based executor failure tracking mechanism
pan3793 commented on code in PR #38732: URL: https://github.com/apache/spark/pull/38732#discussion_r1027519725 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala: ## @@ -723,6 +723,25 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) + val KUBERNETES_MAX_EXECUTOR_FAILURES = +ConfigBuilder("spark.kubernetes.executor.maxNumFailures") Review Comment: The equivalent yarn configuration is `spark.yarn.max.executor.failures` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org