[GitHub] spark issue #22392: [SPARK-23200] Reset Kubernetes-specific config on Checkp...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/22392 Jenkins, test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21067: [SPARK-23980][K8S] Resilient Spark driver on Kubernetes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21067 > ReadWriteOnce storage can only be attached to one node. This is well known. Using the RWO volume for fencing here would work - but this is not representative of all users. This breaks down if you include checkpointing to object storage (s3) or HDFS or into ReadWriteMany volumes like NFS. In all of those cases, there will be a problem with correctness. For folks that need it right away, the same restarts feature can be realized using an approach like the [spark-operator](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator) without any of this hassle in a safe way, so, why are we trying to fit this into Spark with caveats around how volumes should be used to ensure fencing? This seems more error prone and harder to explain and I can't see the gain from it. One way forward is proposing to the k8s community to have a new option jobs that allow us to get fencing from the k8s apiserver through deterministic names. I think that would be a good way forward. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21067: [SPARK-23980][K8S] Resilient Spark driver on Kubernetes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21067 > After a short/configurable delay the driver pod state changed to Unknown and the Job controller initiated a new spark driver. This is dangerous behavior. The old spark driver can still be perfectly functional and running within the cluster even though it's state is marked Unknown. It could also still be making progress with it's own executors. Network connection with the K8s master is not a prerequisite for pods to continue running. On Thu, Jul 12, 2018, 7:57 AM Lucas Kacher wrote: > @baluchicken <https://github.com/baluchicken>, did that test involve > using checkpointing in a shared location? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21067#issuecomment-404541386>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AA3U5z1vUwuS3NHh8Zx388Am8gs1sedTks5uF2PogaJpZM4TTiRg> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21583: [SPARK-23984][K8S][Test] Added Integration Tests for PyS...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21583 @ifilonenko, can you elaborate on what the issue is and what version of docker is needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21067: [SPARK-23980][K8S] Resilient Spark driver on Kubernetes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21067 I don't think this current approach will suffice. Correctness is important here, especially for folks using spark streaming. I understand that we're proposing the use of backoff limits but there is **no guarantee** that a job controller **won't** spin up 2 driver pods when we ask for 1. That by definition is how the job controller works, by being greedy and working towards desired completions. For example, in the case of a network partition, the job controller logic in the Kubernetes master will not differentiate between: 1. Losing contact with the driver pod temporarily 2. Finding no driver pod and starting a new one This has been the reason why in the past I've proposed using a StatefulSet. However, getting termination semantics with a StatefulSet will be more work. I don't think we should sacrifice correctness in this layer as it would surprise the application author who now has to reason about whether the operation they are performing is idempotent. Can we have a proposal and understand all the subtleties before trying to change this behavior. For example, if we end up with more than one driver for a single job, I'd like to ensure that only one of them is making progress (for ex. by using a lease in ZK). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r199608693 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala --- @@ -74,10 +76,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite testBackend = IntegrationTestBackendFactory.getTestBackend testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) +createTestSecret() --- End diff -- I'm wondering if we should add these tests to a separate class? This file is growing in size and we can separate them out a bit for better code understanding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r199608853 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala --- @@ -150,6 +179,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite }) } + test("Run SparkPi with env and mount secrets.") { +sparkAppConf + .set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH) + .set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username") + .set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password") + .set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH) + .set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username") + .set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password") + +runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { +doBasicDriverPodCheck(driverPod) + checkSecrets(driverPod) --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21652: [SPARK-24551][K8S] Add integration tests for secrets
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21652 jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21697: [SPARK-24711][K8S] Fix tags for integration tests
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21697 This is good! Same comment that @srowen had on moving the version up to the parent POM and referencing it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21462 Merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21462 LGTM, we can file a separate task to track what happened to secrets mounting without blocking on removing dead code. Thanks for the fix @skonto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21511: [SPARK-24491][Kubernetes] Configuration support for requ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21511 +1 agreed with @liyinan926. Let's be more careful what we add to the configuration options. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21583: [SPARK-23984][K8S][Test] Added Integration Tests for PyS...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21583 @ifilonenko, is this ready to go? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21462#discussion_r197909183 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh --- @@ -45,12 +45,10 @@ shift 1 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*" env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt -readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt -if [ -n "$SPARK_MOUNTED_CLASSPATH" ]; then - SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_MOUNTED_CLASSPATH" -fi -if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then - cp -R "$SPARK_MOUNTED_FILES_DIR/." . +readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt --- End diff -- +1, that makes sense. Those env-vars are set by launcher at submission time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21462#discussion_r197909535 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -46,8 +46,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { -val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( - sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) --- End diff -- Do we not have secrets -> mountpaths support right now? @mccheah @liyinan926 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21279: [SPARK-24219][k8s] Improve the docker building sc...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21279#discussion_r197906642 --- Diff: bin/docker-image-tool.sh --- @@ -44,15 +44,37 @@ function image_ref { function build { local BUILD_ARGS local IMG_PATH + local TMPFOLDER if [ ! -f "$SPARK_HOME/RELEASE" ]; then # Set image build arguments accordingly if this is a source repo and not a distribution archive. +local JARS="${SPARK_HOME}/assembly/target/scala-${SPARK_SCALA_VERSION}/jars" +TMPFOLDER=`mktemp -q -d examples.XX` +if [ $? -ne 0 ]; then + ehco "Cannot create temp folder, exiting..." + exit 1 +fi + +mkdir -p "${TMPFOLDER}/jars" +cp "${SPARK_HOME}"/examples/target/scala*/jars/* "${TMPFOLDER}/jars" --- End diff -- Do we need the temp folder? Can we just pass the path `"${SPARK_HOME}"/examples/target/scala*/jars/*` to the dockerfile instead to pick up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21555: [SPARK-24547][K8S] Allow for building spark on k8s docke...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21555 Will follow-up. Thanks all for the comments! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21555: [SPARK-24547][K8S] Allow for building spark on k8s docke...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21555 @mccheah, that is a good point but I agree with @ifilonenko that we can do it in a subsequent PR. I'm thinking we merge this as-is and I can try to get a follow-up PR here for dockerfile refactor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21555: [SPARK-24547][K8S] Allow for building spark on k8s docke...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21555 Lgtm. Will merge when tests pass. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21551: [K8S] Fix issue in 'docker-image-tool.sh'
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21551 Phew, this was a bad one. Thanks for the fix all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 If last round's comments are addressed, LGTM from me. Important behavior to check is - the snapshot, and creating replacement executors based on captured snapshot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20697: [SPARK-23010][k8s] Initial checkin of k8s integration te...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20697 Thanks @mccheah and @ssuchter! Added a deprecation notice to the repo in https://github.com/apache-spark-on-k8s/spark-integration --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 If we did miss the watch as you said, when the next poll comes along, that should capture the fact that a pod that we thought existed doesn't exist anymore in the API. So, that would be the level triggered approach - where we'll see the expected state diverge from the current state and take action. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 Yes, watches + polling things is common in controllers. Shared informers are a good stable abstractions they use so, they can avoid the polling in most cases. However, for now, If we just use the watch as a trigger to pull all the data we're interested in and then run our control loop, that's a fine middle ground. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 Sorry, not yet. I explained to @mccheah what I meant about us preferring the level triggered approach and always being able to rebuild state even if some events are lost and resyncing completely on events. If those things look good, @mccheah, please go ahead with the merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 I was planning to do one last pass over it today/tomorrow. I can merge after that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21317 Merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21317: [SPARK-24232][k8s] Add support for secret env var...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21317#discussion_r192206460 --- Diff: docs/running-on-kubernetes.md --- @@ -602,4 +608,20 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.secrets.spark-secret=/etc/secrets. + --- End diff -- LGTM, thanks @skonto; will merge once tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21317: [SPARK-24232][k8s] Add support for secret env var...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21317#discussion_r192168016 --- Diff: docs/running-on-kubernetes.md --- @@ -602,4 +608,20 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.secrets.spark-secret=/etc/secrets. + --- End diff -- Please add a link to the docs here. https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21462#discussion_r191936265 --- Diff: docs/running-on-kubernetes.md --- @@ -121,8 +121,8 @@ This URI is the location of the example jar that is already in the Docker image. If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images. -Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the -`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to --- End diff -- We should confirm that the new init-container-less implementation retains the capability of doing that - maybe through an e2e test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21462#discussion_r191930265 --- Diff: docs/running-on-kubernetes.md --- @@ -121,8 +121,8 @@ This URI is the location of the example jar that is already in the Docker image. If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images. -Those dependencies can be added to the classpath by referencing them with `local://` URIs and/or setting the -`SPARK_EXTRA_CLASSPATH` environment variable in your Dockerfiles. The `local://` scheme is also required when referring to --- End diff -- Did we remove the ability to put jars somewhere in the image and add them to the classpath at runtime? We had this before with https://github.com/apache/spark/pull/20193. cc @mccheah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21462: [SPARK-24428][K8S] Fix unused code
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21462 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191865387 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +154,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = +ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") + .doc("Interval between polls against the Kubernetes API server to inspect the " + +"state of executors.") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(interval => interval > 0, s"API server polling interval must be a" + +" positive time value.") + .createWithDefaultString("30s") + + val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL = --- End diff -- I think this option is hard to reason about and relies on understanding an implementation detail (the event queue). Why not just pick a default and leave it at that? What scenario do we see for the user to try and choose this value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191868513 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventQueue.scala --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.Pod + +private[spark] trait ExecutorPodsEventQueue { --- End diff -- just curious - is there no event queue mechanism within Spark itself for reuse here? Somewhat tangentially, there looks to be [EventLoop](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/EventLoop.scala) in `org.apache.spark.util`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191866216 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventQueue: ExecutorPodsEventQueue) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + def start(applicationId: String): Unit = { +eventQueue.addSubscriber(podAllocationDelay) { updatedPods => + processUpdatedPodEvents(applicationId, updatedPods) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def processUpdatedPodEvents(applicationId: String, updatedPods: Seq[Pod]): Unit = { +updatedPods.foreach { updatedPod => + val execId = updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong + val phase = updatedPod.getStatus.getPhase.toLowerCase + phase match { +case "running" => + pendingExecutors -= execId + runningExecutors += execId +case "failed" | "succeeded" | "error" => + pendingExecutors -= execId + runningExecutors -= execId + } +} + +val currentRunningExecutors = runningExecutors.size +val currentTotalExpectedExecutors = totalExpectedExecutors.get +if (pendingExecutors.isEmpty && currentRunningExecutors < currentTotalExpectedExecutors) { + val numExecutorsToAllocate = math.min( +currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize) + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.") + val newExecutorIds = mutable.Buffer.empty[Long]
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191869618 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleEventHandler.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.{Cache, CacheBuilder} +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +podsEventQueue: ExecutorPodsEventQueue, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleEventHandler._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +podsEventQueue.addSubscriber(eventProcessingInterval) { updatedPods => + updatedPods.foreach { updatedPod => +processUpdatedPod(schedulerBackend, updatedPod) + } +} + } + + private def processUpdatedPod( + schedulerBackend: KubernetesClusterSchedulerBackend, updatedPod: Pod) = { +val execId = updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong +if (isDeleted(updatedPod)) { + removeExecutorFromSpark(schedulerBackend, updatedPod, execId) +} else { + updatedPod.getStatus.getPhase.toLowerCase match { +// TODO (SPARK-24135) - handle more classes of errors +case "error" | "failed" | "succeeded" => + // If deletion failed on a previous try, we can try again if resync informs us the pod + // is still around. + // Delete as best attempt - duplicate deletes will throw an exception but the end state + // of getting rid of the pod is what matters. + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withName(updatedPod.getMetadata.getName) + .delete() + } + removeExecutorFromSpark(schedulerBackend, updatedPod, execId) +case _ => + } +} + } + + private def removeExecutorFromSpark( --- End diff -- Why not just `removeExecutor`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191871593 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventQueue: ExecutorPodsEventQueue) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + def start(applicationId: String): Unit = { +eventQueue.addSubscriber(podAllocationDelay) { updatedPods => + processUpdatedPodEvents(applicationId, updatedPods) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def processUpdatedPodEvents(applicationId: String, updatedPods: Seq[Pod]): Unit = { --- End diff -- We're actually processing pods here right? Not the events themselves from the watch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191865834 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventQueue: ExecutorPodsEventQueue) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. --- End diff -- nit: hanging comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21317 @skonto, looks like a test runner issue here. @ssuchter, can you PTAL? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21279: [SPARK-24219][k8s] Improve the docker building script to...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21279 oops, I didn't notice this. Sorry @jerryshao, will take a look in the next day or two. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21238: [SPARK-24137][K8s] Mount local directories as empty dir ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21238 SG. @liyinan926, let's revisit this if we hear from 2.3 users. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21238: [SPARK-24137][K8s] Mount local directories as empty dir ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21238 @mccheah, wdyt? I just haven't heard from any users here of 2.3 - if you think it's useful for 2.3.1 and low risk, then please feel free to propose a cherrypick. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21238: [SPARK-24137][K8s] Mount local directories as empty dir ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21238 Maintenance releases most often have fixes for stability. We could maybe backport this since it's not a new feature but an omission from before. If it is going to be some effort, thanks to all the refactors that went in so far, I think we should think twice about whether we need to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21238: [SPARK-24137][K8s] Mount local directories as empty dir ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21238 LGTM. Merging to master. Thanks @mccheah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21092 jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21092 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21238: [SPARK-24137][K8s] Mount local directories as emp...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21238#discussion_r187116886 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -455,7 +455,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria private[spark] def validateSettings() { if (contains("spark.local.dir")) { val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " + -"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)." +"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" + --- End diff -- oops, I deleted a comment here accidentally. @rxin said that we could remove this warning about Spark 1.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186879387 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") + && pod.getMetadata.getDeletionTimestamp == null => val executorId = getExecutorId(pod) - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { -executorPodsByIPs.remove(podIP) + failedInitExecutors.add(executorId) + if (failedInitExecutors.size >= executorMaxInitErrors) { +val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" + + s" executors failed to start. The maximum number of allowed startup failures is" + + s" $executorMaxInitErrors. Please contact your cluster administrator or increase" + + s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}." +logError(errorMessage) + KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread() --- End diff -- The controllers have rate-limiting queues with exponential backoff. In the past, we've had issues (https://github.com/kubernetes/kubernetes/issues/30628, https://github.com/kubernetes/kubernetes/issues/27634 and many more) where a misconfigured queue has caused controllers to spew pods and retry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186869138 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") --- End diff -- The termination reason is also a good source of info. kubectl looks at a set of these and turns it into what you see in the describe output - so, [similar logic](https://github.com/kubernetes/kubernetes/blob/6b94e872c63eeea2ed4fdc510c008e4ff9713953/pkg/printers/internalversion/printers.go#L547-L573) could be exercised. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186861330 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") + && pod.getMetadata.getDeletionTimestamp == null => val executorId = getExecutorId(pod) - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { -executorPodsByIPs.remove(podIP) + failedInitExecutors.add(executorId) + if (failedInitExecutors.size >= executorMaxInitErrors) { +val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" + + s" executors failed to start. The maximum number of allowed startup failures is" + + s" $executorMaxInitErrors. Please contact your cluster administrator or increase" + + s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}." +logError(errorMessage) + KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread() +throw new SparkException(errorMessage) } + handleFailedPod(action, pod, podName, podIP) - val executorExitReason = if (action == Action.ERROR) { -logWarning(s"Received error event of executor pod $podName. Reason: " + - pod.getStatus.getReason) -executorExitReasonOnError(pod) - } else if (action == Action.DELETED) { -logWarning(s"Received delete event of executor pod $podName. Reason: " + - pod.getStatus.getReason) -executorExitReasonOnDelete(pod) - } else { -throw new IllegalStateException( - s"Unknown action that should only be DELETED or ERROR: $action") - } - podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason) - - if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) { -log.warn(s"Executor with id $executorId was not marked as disconnected, but the " + - s"watch received an event of type $action for this executor. The executor may " + - "have failed to start in the first place and never registered with the driver.") - } - disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) +case Action.DELETED | Action.ERROR => + handleFailedPod(action, pod, podName, podIP) case _ => logDebug(s"Received event of executor pod $podName: " + action) } } +private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = { + val executorId = getExecutorId(pod) + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { +executorPodsByIPs.remove(podIP) + } + + val executorExitReason = if (action == Action.ERROR) { +logWarning(s"Received error event of executor pod $podName. Reason: " + + pod.getStatus.getReason) +executorExitReasonOnError(pod) + } else if (action == Action.DELETED) { +logWarning(s"Received delete event of executor pod $podName. Reason: " + + pod.getStatus.getReason) +executorExitReasonOnDelete(pod) + } else if (action == Action.MODIFIED) { +executorExitReasonOnInitError(pod) --- End diff -- There isn't a doc, but I'm putting together an initial list. We can grow it as we discover more during opera
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186857969 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") + && pod.getMetadata.getDeletionTimestamp == null => val executorId = getExecutorId(pod) - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { -executorPodsByIPs.remove(podIP) + failedInitExecutors.add(executorId) + if (failedInitExecutors.size >= executorMaxInitErrors) { +val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" + + s" executors failed to start. The maximum number of allowed startup failures is" + + s" $executorMaxInitErrors. Please contact your cluster administrator or increase" + + s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}." +logError(errorMessage) + KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread() --- End diff -- Always re-requesting executors has the potential to overwhelm etcd with failed pods especially for long running jobs. This seems overly conservative but a reasonable place to start - fail entire task upon N failures. With dynamic allocation, it should be possible to check that at least `spark.dynamicAllocation.minExecutors` are alive and make a decision accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21260: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21260 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186643210 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") + && pod.getMetadata.getDeletionTimestamp == null => val executorId = getExecutorId(pod) - logDebug(s"Executor pod $podName at IP $podIP was at $action.") - if (podIP != null) { -executorPodsByIPs.remove(podIP) + failedInitExecutors.add(executorId) + if (failedInitExecutors.size >= executorMaxInitErrors) { +val errorMessage = s"Aborting Spark application because $executorMaxInitErrors" + + s" executors failed to start. The maximum number of allowed startup failures is" + + s" $executorMaxInitErrors. Please contact your cluster administrator or increase" + + s" your setting of ${KUBERNETES_EXECUTOR_MAX_INIT_ERRORS.key}." +logError(errorMessage) + KubernetesClusterSchedulerBackend.this.scheduler.sc.stopInNewThread() +throw new SparkException(errorMessage) } + handleFailedPod(action, pod, podName, podIP) - val executorExitReason = if (action == Action.ERROR) { -logWarning(s"Received error event of executor pod $podName. Reason: " + - pod.getStatus.getReason) -executorExitReasonOnError(pod) - } else if (action == Action.DELETED) { -logWarning(s"Received delete event of executor pod $podName. Reason: " + - pod.getStatus.getReason) -executorExitReasonOnDelete(pod) - } else { -throw new IllegalStateException( - s"Unknown action that should only be DELETED or ERROR: $action") - } - podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason) - - if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) { -log.warn(s"Executor with id $executorId was not marked as disconnected, but the " + - s"watch received an event of type $action for this executor. The executor may " + - "have failed to start in the first place and never registered with the driver.") - } - disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) +case Action.DELETED | Action.ERROR => + handleFailedPod(action, pod, podName, podIP) case _ => logDebug(s"Received event of executor pod $podName: " + action) } } +private def handleFailedPod(action: Action, pod: Pod, podName: String, podIP: String) = { + val executorId = getExecutorId(pod) + logDebug(s"Executor pod $podName at IP $podIP was at $action.") + if (podIP != null) { +executorPodsByIPs.remove(podIP) + } + + val executorExitReason = if (action == Action.ERROR) { +logWarning(s"Received error event of executor pod $podName. Reason: " + + pod.getStatus.getReason) +executorExitReasonOnError(pod) + } else if (action == Action.DELETED) { +logWarning(s"Received delete event of executor pod $podName. Reason: " + + pod.getStatus.getReason) +executorExitReasonOnDelete(pod) + } else if (action == Action.MODIFIED) { +executorExitReasonOnInitError(pod) --- End diff -- I think `Action.MODIFIED` can be fired for a lot of other reasons. I was thinking that we should just use cont
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186642656 --- Diff: docs/running-on-kubernetes.md --- @@ -561,6 +561,13 @@ specific to Spark on Kubernetes. This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod cpu request if set. Task parallelism, e.g., number of tasks an executor can run concurrently is not affected by this. + + spark.kubernetes.executor.maxInitFailures + 10 + +Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently. --- End diff -- Also, I would change the description here, because using init containers for injecting init-containers through mutable webhooks is not something that's all that common. Also should be linked to https://kubernetes.io/docs/admin/extensible-admission-controllers/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186641837 --- Diff: docs/running-on-kubernetes.md --- @@ -561,6 +561,13 @@ specific to Spark on Kubernetes. This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod cpu request if set. Task parallelism, e.g., number of tasks an executor can run concurrently is not affected by this. + + spark.kubernetes.executor.maxInitFailures + 10 + +Maximum number of times executors are allowed to fail with an Init:Error state before failing the application. Note that Init:Error failures should not be caused by Spark itself because Spark does not attach init-containers to pods. Init-containers can be attached by the cluster itself. Users should check with their cluster administrator if these kinds of failures to start the executor pod occur frequently. --- End diff -- This is very specific and covering exactly one kind of error. I'd like this property to cover all initialization errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21241: [SPARK-24135][K8s] Resilience to init-container e...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21241#discussion_r186642338 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -320,50 +322,83 @@ private[spark] class KubernetesClusterSchedulerBackend( override def eventReceived(action: Action, pod: Pod): Unit = { val podName = pod.getMetadata.getName val podIP = pod.getStatus.getPodIP - + val podPhase = pod.getStatus.getPhase action match { -case Action.MODIFIED if (pod.getStatus.getPhase == "Running" +case Action.MODIFIED if (podPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) -case Action.DELETED | Action.ERROR => +case Action.MODIFIED if (podPhase == "Init:Error" || podPhase == "Init:CrashLoopBackoff") --- End diff -- Maybe have a separate set for all the error states we want to check. Having one place would make this easier to change in future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21202: [SPARK-24129] [K8S] Add option to pass --build-arg's to ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21202 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21202: [SPARK-24129] [K8S] Add option to pass --build-arg's to ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21202 LGTM pending test run --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21095: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21095 The testing is not a blocker for the review. When I said "tests for non-hostpath type volumes", I meant to say that we want to cover more than just hostpath mounts with the initial PR - because we might end up with something too specific. Sorry if that wasn't clear from my comment. I think doing `EmptyDir` mounts alongside hostpath (with as much code sharing as possible) would be a good idea since it's the most common volume type. With those two, I think we can push this forward and have the e2e come along side by side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21095: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21095 @madanadit, sorry about the long turnaround time here. I made a cursory pass over it - and it looks good. Before getting deeper into the code, I think we need more exhaustive unit testing for non-hostpath type volumes, and a couple of [e2e tests](https://github.com/apache-spark-on-k8s/spark-integration). Once those are done, I'm also happy to do a more detailed review here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21095: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21095 @madanadit, thanks for following up with the PR and for your [design doc](https://docs.google.com/document/d/15-mk7UnOYNTXoF6EKaVlelWYc9DTrTXrYoodwDuAwY4/edit#heading=h.8jlem461uvwv). I had a few comments on the doc which I've added for discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21095: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21095 jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings for PySp...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21092 Thanks for taking this on @ifilonenko. Left some initial comments on the PR without going too much in depth - since as you noted, it's WIP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182523365 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala --- @@ -29,9 +30,11 @@ private[spark] class KubernetesExecutorBuilder( def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { val baseFeatures = Seq(provideBasicStep(kubernetesConf)) -val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) -} else baseFeatures +val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Some(provideSecretsStep(kubernetesConf)) } else None +val allFeatures: Seq[KubernetesFeatureConfigStep] = --- End diff -- It does not need any changes/arg passing during executor pod construction? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182522479 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep( ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) } -val driverContainer = new ContainerBuilder(pod.container) +val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container) --- End diff -- The previous name seemed clearer to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182517091 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh --- @@ -62,6 +69,14 @@ case "$SPARK_K8S_CMD" in "$@" ) ;; + driver-py) +CMD=( + "$SPARK_HOME/bin/spark-submit" + --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" + --deploy-mode client + "$@" $PYSPARK_PRIMARY $PYSPARK_SECONDARY --- End diff -- Can we have more descriptive names for `PYSPARK_PRIMARY` and `PYSPARK_SECONDARY`? Maybe `PYSPARK_MAINAPP` and `PYSPARK_ARGS`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182518301 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile --- @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +COPY python /opt/spark/python +RUN apk add --no-cache python && \ +python -m ensurepip && \ +rm -r /usr/lib/python*/ensurepip && \ +pip install --upgrade pip setuptools && \ +rm -r /root/.cache +ENV PYTHON_VERSION 2.7.13 --- End diff -- If we set this, are we implicitly imposing a contract on the base image to have this particular version of python installed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182520090 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala --- @@ -89,6 +97,29 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SECRETS_STEP_TYPE) } + test("Apply Python step if main resource is python.") { +val conf = KubernetesConf( --- End diff -- Unrelated to this PR, but @mccheah, should we have something like the fluent/builder pattern here for `KubernetesConf` since it's grown to quite a few params. I'm happy to take a stab at it if we agree that's a good direction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings f...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21092#discussion_r182524173 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( .addToRequests("memory", driverMemoryQuantity) .addToLimits("memory", driverMemoryQuantity) .endResources() - .addToArgs("driver") + .addToArgs(driverDockerContainer) .addToArgs("--properties-file", SPARK_CONF_PATH) .addToArgs("--class", conf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(conf.roleSpecificConf.appArgs: _*) - .build() +val driverContainer = + if (driverDockerContainer == "driver-py") { --- End diff -- Wondering if we can discover if it's a Python application in a better way here. Probably using the built up spark conf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S][WIP] Initial Python Bindings for PySp...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21092 cc @holdenk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21032: [SPARK-23529][K8s] Support mounting hostPath volumes for...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21032 @madanadit @liyinan926, can we do a short doc with all the types of volumes and config options and run that through the rest of the community? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20910 Merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21032: [SPARK-23529][K8s] Support mounting hostPath volumes for...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21032 Yes, that is what I was indicating as well. We should have a design for all different volume types. Extending this PR as the design evolves is a good way to go about that. On Thu, Apr 12, 2018, 11:37 AM Yinan Li wrote: > I think we should have a more general solution so it works for other types > of volumes. Particularly, we should have a design discussion on the format > of values of such a property so it works for any type of volumes. I suggest > we hold on on this PR until we setup a general solution. @foxish > <https://github.com/foxish> @felixcheung <https://github.com/felixcheung> > . > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/21032#issuecomment-380903531>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AA3U51kqvGuyzur2h9KXvBYaWcZU6ME5ks5tn571gaJpZM4TOyZd> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21032: [SPARK-23529][K8s] Support mounting hostPath volu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21032#discussion_r181176057 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -117,6 +117,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = --- End diff -- Also, do we need a symmetric option for the drivers also? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21032: [SPARK-23529][K8s] Support mounting hostPath volu...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/21032#discussion_r181172885 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -117,6 +117,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = --- End diff -- If we are adding this option, it should be a more general API - allowing PVs, hostpath volumes and emptyDir volumes to be mounted. In the near future, hostpath volumes will be superseded by local PVs - https://github.com/kubernetes/kubernetes/issues/7562 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21032: [SPARK-23529][K8s] Support mounting hostPath volumes for...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21032 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20910 @mccheah I think it's fine as is now. We can take care of moving abstractions between the submission client and the driver in a future PR if necessary. Just the scala style issue needs taking care of; and then this LGTM; --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r180535990 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesSpec.scala --- @@ -14,17 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.submit.steps +package org.apache.spark.deploy.k8s -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import io.fabric8.kubernetes.api.model.HasMetadata -/** - * Represents a step in configuring the Spark driver pod. - */ -private[spark] trait DriverConfigurationStep { +private[spark] case class KubernetesSpec( --- End diff -- This class is named as though it applies to driver and executor construction. Maybe `KubernetesDriverSpec`? It's also a bit unclear to me what purpose this abstraction serves as opposed to the way `KubernetesExecutorBuilder` goes about building the pod. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20989: [SPARK-23529][K8s] Support mounting hostPath volumes for...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20989 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 LGTM. Merging to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 @mccheah - heads up, this will likely lead to a rebase on https://github.com/apache/spark/pull/20910. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 @andrusha, no worries. I think this is going to be rather important for folks with private registries. Thanks for following up on it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 Jenkins, test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 Jenkins, test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20811: [SPARK-23668][K8S] Add config option for passing ...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20811#discussion_r178982578 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -54,6 +54,12 @@ private[spark] object Config extends Logging { .checkValues(Set("Always", "Never", "IfNotPresent")) .createWithDefault("IfNotPresent") + val IMAGE_PULL_SECRET = +ConfigBuilder("spark.kubernetes.container.image.pullSecret") --- End diff -- Also should be named `image.pullSecrets` to stay consistent with the field we're setting on the pods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20811: [SPARK-23668][K8S] Add config option for passing ...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20811#discussion_r178982439 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -54,6 +54,12 @@ private[spark] object Config extends Logging { .checkValues(Set("Always", "Never", "IfNotPresent")) .createWithDefault("IfNotPresent") + val IMAGE_PULL_SECRET = +ConfigBuilder("spark.kubernetes.container.image.pullSecret") --- End diff -- Based on a conversation with the node team, this is meant to be an array of possibly many secret names; since one could have multiple secrets specified for different registries and spread across multiple k8s secrets. So, I'd suggest we implement the full API - this should be capable of accepting a comma separated list of one or more secret names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 LGTM after one comment. Thanks for fixing the style issues @andrusha --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20811: [SPARK-23668][K8S] Add config option for passing ...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20811#discussion_r178924651 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -54,6 +54,12 @@ private[spark] object Config extends Logging { .checkValues(Set("Always", "Never", "IfNotPresent")) .createWithDefault("IfNotPresent") + val IMAGE_PULL_SECRET = +ConfigBuilder("spark.kubernetes.imagePullSecret") --- End diff -- Given there's an option `spark.kubernetes.container.image.pullPolicy`, we should make this consistent as `spark.kubernetes.container.image.pullSecret` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 Jenkins, test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 @andrusha, can you address the style check failure that @felixcheung pointed out. This should be good to merge after that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20553 Merging once tests pass. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20943: [ SPARK-23825] [K8s] Requesting memory + memory o...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20943#discussion_r178615678 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala --- @@ -91,7 +91,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite { val resourceRequirements = preparedDriverSpec.driverContainer.getResources val requests = resourceRequirements.getRequests.asScala assert(requests("cpu").getAmount === "2") -assert(requests("memory").getAmount === "256Mi") +assert(requests("memory").getAmount === "456Mi") --- End diff -- nvm - @liyinan926 just pointed out that it's being set explicitly above which I totally missed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20553: [SPARK-23285][K8S] Add a config property for spec...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20553#discussion_r178614696 --- Diff: docs/running-on-kubernetes.md --- @@ -549,14 +549,22 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.limit.cores (none) -Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. +Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. + + spark.kubernetes.executor.request.cores + (none) + +Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). +This is distinct from spark.executor.cores and is only used for specifying the executor pod cpu request if set. Specifically, if this is set, its value is used to specify the executor --- End diff -- Might be good to also add typical ways (1.5m, 2.0, 5, etc) in which one can specify this and link to https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20553: [SPARK-23285][K8S] Add a config property for spec...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20553#discussion_r178614227 --- Diff: docs/running-on-kubernetes.md --- @@ -549,14 +549,22 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.limit.cores (none) -Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. +Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod. + + spark.kubernetes.executor.request.cores + (none) + +Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). +This is distinct from spark.executor.cores and is only used for specifying the executor pod cpu request if set. Specifically, if this is set, its value is used to specify the executor --- End diff -- The two lines appear a bit redundant - can do with just one probably. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20943: [ SPARK-23825] [K8s] Requesting memory + memory o...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/20943#discussion_r178609700 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala --- @@ -91,7 +91,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite { val resourceRequirements = preparedDriverSpec.driverContainer.getResources val requests = resourceRequirements.getRequests.asScala assert(requests("cpu").getAmount === "2") -assert(requests("memory").getAmount === "256Mi") +assert(requests("memory").getAmount === "456Mi") --- End diff -- I'm wondering - shouldn't this be 256M + 384M? The documentation says - `executorMemory * 0.10, with minimum of 384`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20669: [SPARK-22839][K8S] Remove the use of init-container for ...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20669 There's a section explaining it at the bottom of https://spark.apache.org/committers.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 cc/ @mccheah @liyinan926 @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20811: [SPARK-23668][K8S] Add config option for passing through...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20811 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20791: [SPARK-23618][K8s][BUILD] Initialize BUILD_ARGS in docke...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20791 Merged into master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20791: [SPARK-23618][K8s][BUILD] Initialize BUILD_ARGS in docke...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/20791 LGTM! Thanks @jooseong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org