[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r196257515 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +154,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = +ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") + .doc("Interval between polls against the Kubernetes API server to inspect the " + +"state of executors.") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(interval => interval > 0, s"API server polling interval must be a" + +" positive time value.") + .createWithDefaultString("30s") + + val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL = --- End diff -- This should have been marked with `internal()`, that was an oversight. Don't think it's entirely necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r196249925 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -154,6 +154,24 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = +ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") + .doc("Interval between polls against the Kubernetes API server to inspect the " + +"state of executors.") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(interval => interval > 0, s"API server polling interval must be a" + +" positive time value.") + .createWithDefaultString("30s") + + val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL = --- End diff -- @mccheah because this is not internal shouldn't we include this in `docs/` I don't see why we aren't documenting this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21366 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195569529 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Yeah I see. I guess this is done by ExecutorPodsAllocator as a subscriber. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195567414 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- For example if we changed the `initialDelay` here to stall before the first snapshots sync then with the above scheme we'd still try to request executors immediately, because the subscriber thread kicks off an allocation round immediately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195567079 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- But polling isn't where we start to create executors - that's done on the subscriber rounds. Polling here populates the snapshots store, but processing the snapshots happens on the subscriber thread(s). Furthermore with the scheme proposed above you never have to even poll for snapshots once before we begin requesting executors, because the pods allocator subscriber will trigger immediately with an empty snapshot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195566124 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent._ + +import io.fabric8.kubernetes.api.model.Pod +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber] + private val pollingTasks = mutable.Buffer.empty[Future[_]] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = { +val newSubscriber = SnapshotsSubscriber( +new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots) +subscribers += newSubscriber +pollingTasks += subscribersExecutor.scheduleWithFixedDelay( + toRunnable(() => callSubscriber(newSubscriber)), --- End diff -- Just tried that and it doesn't work - I think that requires the scala-java8-compat module which I don't think is worth pulling in for just this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195565315 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- You could add some comment saying this is where we create executors and by what way. I mean on mesos you start executors when you get offers from agents and that is straightforward and makes sense. Here you want to start them ASAP I guess so then you can send tasks to them right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195561927 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- And though we don't allow for this right now, the above would allow subscribers to be added midway through to receive the most recent snapshot immediately. But again we don't do this right now - we setup all subscribers on startup before starting pushing snapshots. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195549746 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Also you can do it at doRequestTotalExecutors right? When do you have the information of how many you need? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195542619 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- I see - I think what we actually want is `ExecutorPodsSnapshotStoreImpl` to initialize the subscriber with its current snapshot. That creates the semantics where the new subscriber will first receive the most up to date state immediately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195535296 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Yes you need to trigger the initial creation of executors somehow and yes I saw that in the tests, my only concern is that this should be explicit not implicit to make code more obvious anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195513995 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) -val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") + +val bufferSnapshotsExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) +val removedExecutorsCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) --- End diff -- The cache is only for a best effort attempt to not remove the same executor from the scheduler backend multiple times, but at the end of the day even if we do accidentally remove multiple times the only noticeable result is noisy logs. The scheduler backend properly handles multiple attempts to remove but we'd prefer it if we didn't have to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512808 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Not strictly why that's done here but a side-effect I suppose. Really the snapshots store should push an initial empty snapshot to all subscribers when it starts, and the unit tests do check for that - it's the responsibility of the snapshots store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512430 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +clock: Clock) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podCreationTimeout = math.max(podAllocationDelay * 5, 6) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + + def start(applicationId: String): Unit = { +snapshotsStore.addSubscriber(podAllocationDelay) { + onNewSnapshots(applicationId, _) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) +// For all executors we've created against the API but have not seen in a snapshot +// yet - check the current time. If the current time has exceeded some threshold, +// assume that the pod was either never created (the API server never properly +// handled the creation request), or the API server created the pod but we missed +// both the creation and deletion events. In either case, delete the missing pod +// if possible, and mark such a pod to be rescheduled below. +newlyCreatedExecutors.foreach { case (execId, timeCreated) => + if (clock.getTimeMillis() - timeCreated > podCreationTimeout) { +logWarning(s"Executor with id $execId was not detected in the Kubernetes" + + s" cluster after $podCreationTimeout milliseconds despite the fact that a" + + " previous allocation attempt tried to create it. The executor may have been" + + " deleted but the application missed the deletion event.") +Utils.tryLogNonFatalError { + kubernetesClient +.pods() +.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) +.delete() --- End diff -- That's handled by the lifecycle manager already, because the lifecycle manager looks at what the scheduler backend believes are its executors and reconciles them with what's in the snapshot. --- - To unsubscribe, e-
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512219 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) -val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") + +val bufferSnapshotsExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) +val removedExecutorsCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) --- End diff -- DOn't think it has to be configurable. Basically we should only receive the removed executor events multiple times for a short period of time, then we should settle into steady state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195445788 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- Thanx @ktoso! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195401593 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { --- End diff -- Could you add some debug logging here. In general it would be good to be able to trace what is happening in case of a an issue with debug mode, this applies to all classes introduced for both watching and polling. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195390942 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Do you start with an empty state to trigger executor creation at the very beginning? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195382788 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent._ + +import io.fabric8.kubernetes.api.model.Pod +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber] + private val pollingTasks = mutable.Buffer.empty[Future[_]] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = { +val newSubscriber = SnapshotsSubscriber( +new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots) +subscribers += newSubscriber +pollingTasks += subscribersExecutor.scheduleWithFixedDelay( + toRunnable(() => callSubscriber(newSubscriber)), --- End diff -- toRunnable is not needed with lambdas in Java 8. Just pass there: () => callSubscriber(newSubscriber) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195380019 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.Cache +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleManager( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleManager._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +snapshotsStore.addSubscriber(eventProcessingInterval) { + onNewSnapshots(schedulerBackend, _) +} + } + + private def onNewSnapshots( + schedulerBackend: KubernetesClusterSchedulerBackend, + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +val execIdsRemovedInThisRound = mutable.HashSet.empty[Long] +snapshots.foreach { snapshot => + snapshot.executorPods.foreach { case (execId, state) => +state match { + case deleted@PodDeleted(pod) => +removeExecutorFromSpark(schedulerBackend, deleted, execId) +execIdsRemovedInThisRound += execId + case failed@PodFailed(pod) => +onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound) + case succeeded@PodSucceeded(pod) => --- End diff -- same as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195379975 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.Cache +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleManager( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleManager._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +snapshotsStore.addSubscriber(eventProcessingInterval) { + onNewSnapshots(schedulerBackend, _) +} + } + + private def onNewSnapshots( + schedulerBackend: KubernetesClusterSchedulerBackend, + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +val execIdsRemovedInThisRound = mutable.HashSet.empty[Long] +snapshots.foreach { snapshot => + snapshot.executorPods.foreach { case (execId, state) => +state match { + case deleted@PodDeleted(pod) => +removeExecutorFromSpark(schedulerBackend, deleted, execId) +execIdsRemovedInThisRound += execId + case failed@PodFailed(pod) => --- End diff -- same as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195379907 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.Cache +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleManager( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleManager._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +snapshotsStore.addSubscriber(eventProcessingInterval) { + onNewSnapshots(schedulerBackend, _) +} + } + + private def onNewSnapshots( + schedulerBackend: KubernetesClusterSchedulerBackend, + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +val execIdsRemovedInThisRound = mutable.HashSet.empty[Long] +snapshots.foreach { snapshot => + snapshot.executorPods.foreach { case (execId, state) => +state match { + case deleted@PodDeleted(pod) => --- End diff -- s/succeeded@PodSucceeded(pod)/succeeded@PodSucceeded(_) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195379060 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) -val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") + +val bufferSnapshotsExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) +val removedExecutorsCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) --- End diff -- Why 3 minutes? Should this be configurable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195351851 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent._ + +import io.fabric8.kubernetes.api.model.Pod +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: ScheduledExecutorService) --- End diff -- Could you add a description of the class here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195350385 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +clock: Clock) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podCreationTimeout = math.max(podAllocationDelay * 5, 6) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + + def start(applicationId: String): Unit = { +snapshotsStore.addSubscriber(podAllocationDelay) { + onNewSnapshots(applicationId, _) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) +// For all executors we've created against the API but have not seen in a snapshot +// yet - check the current time. If the current time has exceeded some threshold, +// assume that the pod was either never created (the API server never properly +// handled the creation request), or the API server created the pod but we missed +// both the creation and deletion events. In either case, delete the missing pod +// if possible, and mark such a pod to be rescheduled below. +newlyCreatedExecutors.foreach { case (execId, timeCreated) => + if (clock.getTimeMillis() - timeCreated > podCreationTimeout) { +logWarning(s"Executor with id $execId was not detected in the Kubernetes" + + s" cluster after $podCreationTimeout milliseconds despite the fact that a" + + " previous allocation attempt tried to create it. The executor may have been" + + " deleted but the application missed the deletion event.") +Utils.tryLogNonFatalError { + kubernetesClient +.pods() +.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) +.delete() --- End diff -- Shouldn't deleteFromSpark called here as well? Couldn't be the case that the executor exists at a higher level but K8s backend missed it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addition
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195040122 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) --- End diff -- Ok just wanted to verify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194856403 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- I ended up just removing reactive programming entirely - the buffering is implemented manually. Please take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194840955 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- These types are not exposed - they're only implementation details in the Kubernetes module. Furthermore the RxJava dependency will be in the Spark distribution but is not a dependency pulled in by spark-core. It sounds like there is some contention with the extra dependency though, so should we be considering implementing our own mechanisms from the ground up? I think the bottom line question is: can spark-kubernetes, NOT spark-core, pull in RxJava? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194839188 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) --- End diff -- There's a shutdown hook that closes the Spark Context and all its dependent components, including the `KubernetesClusterSchedulerBackend`. In `KubernetesClusterSchedulerBackend` we shut down all of these components. We also shut them down with handling caught exceptions. Also all of these thread pools are daemon thread pools so they should shut themselves down on JVM exit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user ktoso commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194736679 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- Seems some confusion is here? Akka Streams does not depend on Rx of course, they both alternative implementations of Reactive Streams ( http://reactive-streams.org/ ) which have been included in JDK9 as `java.util.concurrent.Flow.*` and Akka also implements those, but does not require JDK9; you can use JDK8 + RS and if you use JDK9 you could use the JDK's types but it's not required. Anything else I should clarify or review here? For inter-op purposes it would be good to not expose on a specific implementation but expose the reactive-streams types (`org.reactivestreams.Publisher` etc), but that only matters if the types are exposed. As for including dependencies in core Spark -- I would expect this to carry quite a bit of implications though don't know Spark's rules about it (ofc less dependencies == better for users, since less chances to version-clash with libraries they'd use) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194670526 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- > Does Akka streams without JDK 9 depend on ReactiveX? It depends on reactive streams library so you dont need to bring rx-Java in. @ktoso correct me if I am wrong. > In this regard we are no different from the other custom controllers in the Kubernetes ecosystem which have to handle managing large number of pods. Have an example of a specific controller to get a better understanding? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194664576 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) --- End diff -- The are a number of such calls, are we sure they will be executed in any scenario like an exception? Are the stop calls bound to some shutdown hook? Is this covered by RX-java? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194574775 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- > One question is how about performance at scale when you get events from hundreds of executors at once which framework should work best? Should we worry about this? One already runs into this problem since we open a Watch that streams all events down anyways. In any implementation where we want events to be processed at different intervals, there needs to be some buffering or else we choose to ignore some events and only look at the most up to date snapshot at the given intervals. As discussed in https://github.com/apache/spark/pull/21366#discussion_r194181797 we really want to process as many events as we get as possible, so we're stuck with buffering somewhere, and regardless of the observables or reactive programming framework we pick we still have to effectively store `O(E)` items, `E` being the number of events. And aside from the buffering we'd need to consider the scale of the stream of events flowing from the persistent HTTP connection backing the Watch. In this regard we are no different from the other custom controllers in the Kubernetes ecosystem which have to handle managing large number of pods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194572858 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- > f not mistaken Java 9 has introduced the related interfaces, so rx-java might not be needed in the future when Spark will update the supported java version. Spark will need to support Java 8 for the foreseeable future. We'll be using a Java 8 only compatible solution for awhile. > Some more scala centric implementations: akka streams a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant implementation. This does not depend on rx-java. Also there is monix, and RxScala seems outdated. A comparison between monix and others here. Does Akka streams without JDK 9 depend on ReactiveX? If so then we have the same dependency problem. As for monix vs. Akka vs. RxJava, think that's an implementation detail - at the end of the day the main concern is the dependency addition and either way we're adding an external dependency vs. implementing the semantics ourselves. If we think monix / Akka is the more elegant solution over RxJava we can switch to that, but given how little we're actually doing that's actually in the functional programming style I don't think the difference between Java and Scala is significant in this particular instance. > From the dependencies already brought in by Spark is there a lib that could provide similar functionality, if not based on Reactive stuff some other way queues or something? I didn't see any but I am open to being corrected here. Spark implements an event queue but it doesn't suffice for the paradigm of having multiple subscribers process snapshots at different intervals. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194560151 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- @felixcheung (sorry for the delayed answer), @mccheah a few thoughts/observations: a) If not mistaken Java 9 has introduced the related interfaces, so rx-java might not be needed in the future. b) Some more scala centric implementations: [akka streams](https://doc.akka.io/docs/akka/current/stream/stream-integrations.html#integrating-with-reactive-streams) instead, a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant implementation. This does not depend on rx-java. Also there is [monix] (https://github.com/monix/monix), and (RxScala)[https://github.com/ReactiveX/RxScala] seems outdated. A comparison between monix and others (here)[https://monix.io/docs/2x/reactive/observable-comparisons.html]. One question is how about performance at scale when you get events from hundreds of executors at once which framework should work best? Should we worry about this? Rx-java is light but also monix might be light at the end of the day or akka streams, although its purpose goes beyond observables etc. c) I built the distro from the PR, so rxjava-2.1.13.jar, reactive-streams-1.0.2.jar are added to the jars. Spark core does not seem to have any classes in it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194536061 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( --- End diff -- And the internal implementation is pretty different between the test and the real versions - the real version uses ReactiveX observables while the fake one just uses an in-memory buffer, etc. - it doesn't entirely make sense to have the test implementation extend the real implementation. Mocks are too verbose because of the complexity of the test implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194535271 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( --- End diff -- Prefer an impl class here because we also implement a test-only version in https://github.com/apache/spark/pull/21366/files#diff-ac93ac5a7a34f600bb942c7feb092924 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194534077 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( --- End diff -- Picky: Having an Impl class is more Java like. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194184310 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( +bufferSnapshotsExecutor: ScheduledExecutorService, +executeSubscriptionsExecutor: ExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val snapshotsObservable = PublishSubject.create[ExecutorPodsSnapshot]() + private val observedDisposables = mutable.Buffer.empty[Disposable] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (subscriber: ExecutorPodsSnapshot => Unit): Unit = { +observedDisposables += snapshotsObservable + // Group events in the time window given by the caller. These buffers are then sent + // to the caller's lambda at the given interval, with the pod updates that occurred + // in that given interval. + .buffer( +processBatchIntervalMillis, +TimeUnit.MILLISECONDS, +// For testing - specifically use the given scheduled executor service to trigger +// buffer boundaries. Allows us to inject a deterministic scheduler here. +Schedulers.from(bufferSnapshotsExecutor)) + // Trigger an event cycle immediately. Not strictly required to be fully correct, but + // in particular the pod allocator should try to request executors immediately instead + // of waiting for one pod allocation delay. + .startWith(Lists.newArrayList(ExecutorPodsSnapshot())) + // Force all triggered events - both the initial event above and the buffered ones in + // the following time windows - to execute asynchronously to this call's thread. + .observeOn(Schedulers.from(executeSubscriptionsExecutor)) + .subscribe(toReactivexConsumer { snapshots: java.util.List[ExecutorPodsSnapshot] => +Utils.tryLogNonFatalError { + snapshots.asScala.foreach(subscriber) +} + }) + } + + override def stop(): Unit = { +observedDisposables.foreach(_.dispose()) +snapshotsObservable.onComplete() +ThreadUtils.shutdown(bufferSnapshotsExecutor) +ThreadUtils.shutdown(executeSubscriptionsExecutor) + } + + override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized { +currentSnapshot = currentSnapshot.withUpdate(updatedPod) --- End diff -- A brief remark that one still might not be 100% accurate because of failures in the information flow (missing events etc) - but for all the events we do get we should be trying to handle them all as best we can. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194183023 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( +bufferSnapshotsExecutor: ScheduledExecutorService, +executeSubscriptionsExecutor: ExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val snapshotsObservable = PublishSubject.create[ExecutorPodsSnapshot]() + private val observedDisposables = mutable.Buffer.empty[Disposable] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (subscriber: ExecutorPodsSnapshot => Unit): Unit = { +observedDisposables += snapshotsObservable + // Group events in the time window given by the caller. These buffers are then sent + // to the caller's lambda at the given interval, with the pod updates that occurred + // in that given interval. + .buffer( +processBatchIntervalMillis, +TimeUnit.MILLISECONDS, +// For testing - specifically use the given scheduled executor service to trigger +// buffer boundaries. Allows us to inject a deterministic scheduler here. +Schedulers.from(bufferSnapshotsExecutor)) + // Trigger an event cycle immediately. Not strictly required to be fully correct, but + // in particular the pod allocator should try to request executors immediately instead + // of waiting for one pod allocation delay. + .startWith(Lists.newArrayList(ExecutorPodsSnapshot())) + // Force all triggered events - both the initial event above and the buffered ones in + // the following time windows - to execute asynchronously to this call's thread. + .observeOn(Schedulers.from(executeSubscriptionsExecutor)) + .subscribe(toReactivexConsumer { snapshots: java.util.List[ExecutorPodsSnapshot] => +Utils.tryLogNonFatalError { + snapshots.asScala.foreach(subscriber) +} + }) + } + + override def stop(): Unit = { +observedDisposables.foreach(_.dispose()) +snapshotsObservable.onComplete() +ThreadUtils.shutdown(bufferSnapshotsExecutor) +ThreadUtils.shutdown(executeSubscriptionsExecutor) + } + + override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized { +currentSnapshot = currentSnapshot.withUpdate(updatedPod) --- End diff -- gotcha, that makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194181797 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( +bufferSnapshotsExecutor: ScheduledExecutorService, +executeSubscriptionsExecutor: ExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val snapshotsObservable = PublishSubject.create[ExecutorPodsSnapshot]() + private val observedDisposables = mutable.Buffer.empty[Disposable] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (subscriber: ExecutorPodsSnapshot => Unit): Unit = { +observedDisposables += snapshotsObservable + // Group events in the time window given by the caller. These buffers are then sent + // to the caller's lambda at the given interval, with the pod updates that occurred + // in that given interval. + .buffer( +processBatchIntervalMillis, +TimeUnit.MILLISECONDS, +// For testing - specifically use the given scheduled executor service to trigger +// buffer boundaries. Allows us to inject a deterministic scheduler here. +Schedulers.from(bufferSnapshotsExecutor)) + // Trigger an event cycle immediately. Not strictly required to be fully correct, but + // in particular the pod allocator should try to request executors immediately instead + // of waiting for one pod allocation delay. + .startWith(Lists.newArrayList(ExecutorPodsSnapshot())) + // Force all triggered events - both the initial event above and the buffered ones in + // the following time windows - to execute asynchronously to this call's thread. + .observeOn(Schedulers.from(executeSubscriptionsExecutor)) + .subscribe(toReactivexConsumer { snapshots: java.util.List[ExecutorPodsSnapshot] => +Utils.tryLogNonFatalError { + snapshots.asScala.foreach(subscriber) +} + }) + } + + override def stop(): Unit = { +observedDisposables.foreach(_.dispose()) +snapshotsObservable.onComplete() +ThreadUtils.shutdown(bufferSnapshotsExecutor) +ThreadUtils.shutdown(executeSubscriptionsExecutor) + } + + override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized { +currentSnapshot = currentSnapshot.withUpdate(updatedPod) --- End diff -- I'm pretty sure we want all snapshots. The reason is to get a more accurate response in the lifecycle handler. If a pod enters and error state and then someone marks it for deletion, you want to at least have the chance to capture the error state when sending the executor removal request to the Spark scheduler. For the pods allocator we can do either the latest or all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194180839 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.Cache +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleManager( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleManager._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +snapshotsStore.addSubscriber(eventProcessingInterval) { + onNextSnapshot(schedulerBackend, _) +} + } + + private def onNextSnapshot( + schedulerBackend: KubernetesClusterSchedulerBackend, + snapshot: ExecutorPodsSnapshot): Unit = { +val execIdsRemovedInThisRound = mutable.HashSet.empty[Long] +snapshot.executorPods.foreach { case (execId, state) => + state match { +case PodDeleted(pod) => + removeExecutorFromSpark(schedulerBackend, pod, execId) + execIdsRemovedInThisRound += execId +case errorOrSucceeded @ (PodFailed(_) | PodSucceeded(_)) => + removeExecutorFromK8s(errorOrSucceeded.pod) + removeExecutorFromSpark(schedulerBackend, errorOrSucceeded.pod, execId) + execIdsRemovedInThisRound += execId +case _ => + } +} + +// Reconcile the case where Spark claims to know about an executor but the corresponding pod +// is missing from the cluster. This would occur if we miss a deletion event and the pod +// transitions immediately from running io absent. +(schedulerBackend.getExecutorIds().map(_.toLong).toSet + -- snapshot.executorPods.keySet + -- execIdsRemovedInThisRound).foreach { missingExecutorId => + if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) { +val exitReason = ExecutorExited( + UNKNOWN_EXIT_CODE, + exitCausedByApp = false, + s"The executor with ID $missingExecutorId was not found in the cluster but we didn't" + +s" get a reason why. Marking the executor as failed. The executor may have been" + +s" deleted but the driver missed the deletion event.") +schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason) + } +} + } + + private def removeExecutorFromK8s(updatedPod: Pod): Unit = { +// If deletion failed on a previous try, we can try again if resync informs us the pod +// is still around. +// Delete as best attempt - duplicate deletes will throw an exception but the end state +// of getting rid of the pod is what matters. +Utils.tryLogNonFatalError { + kubernetesClient +.pods() +.withName(updatedPod.getMetadata.g
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194179846 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( +bufferSnapshotsExecutor: ScheduledExecutorService, +executeSubscriptionsExecutor: ExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val snapshotsObservable = PublishSubject.create[ExecutorPodsSnapshot]() + private val observedDisposables = mutable.Buffer.empty[Disposable] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (subscriber: ExecutorPodsSnapshot => Unit): Unit = { +observedDisposables += snapshotsObservable + // Group events in the time window given by the caller. These buffers are then sent + // to the caller's lambda at the given interval, with the pod updates that occurred + // in that given interval. + .buffer( +processBatchIntervalMillis, +TimeUnit.MILLISECONDS, +// For testing - specifically use the given scheduled executor service to trigger +// buffer boundaries. Allows us to inject a deterministic scheduler here. +Schedulers.from(bufferSnapshotsExecutor)) + // Trigger an event cycle immediately. Not strictly required to be fully correct, but + // in particular the pod allocator should try to request executors immediately instead + // of waiting for one pod allocation delay. + .startWith(Lists.newArrayList(ExecutorPodsSnapshot())) + // Force all triggered events - both the initial event above and the buffered ones in + // the following time windows - to execute asynchronously to this call's thread. + .observeOn(Schedulers.from(executeSubscriptionsExecutor)) + .subscribe(toReactivexConsumer { snapshots: java.util.List[ExecutorPodsSnapshot] => +Utils.tryLogNonFatalError { + snapshots.asScala.foreach(subscriber) +} + }) + } + + override def stop(): Unit = { +observedDisposables.foreach(_.dispose()) +snapshotsObservable.onComplete() +ThreadUtils.shutdown(bufferSnapshotsExecutor) +ThreadUtils.shutdown(executeSubscriptionsExecutor) + } + + override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized { +currentSnapshot = currentSnapshot.withUpdate(updatedPod) --- End diff -- we only ever need to send the latest snapshot update with this new approach, so having the `snapshotsObservable buffer` doesn't make sense anymore? I.e., we don't need to send the subscriber all the Snapshots that happened during a given interval. Only the last one. So we can just have a periodic task that takes the latest snapshot and sends it to all subscribers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194177422 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -49,48 +54,58 @@ private[spark] class ExecutorPodsAllocator( .withName(kubernetesDriverPodName) .get() - // Use sets of ids instead of counters to be able to handle duplicate events. - - // Executor IDs that have been requested from Kubernetes but are not running yet. - private val pendingExecutors = mutable.Set.empty[Long] - - // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the - // executors that are running. But, here we choose instead to maintain all state within this - // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop - // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. - // We may need to consider where these perspectives may differ and which perspective should - // take precedence. - private val runningExecutors = mutable.Set.empty[Long] + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] def start(applicationId: String): Unit = { -eventQueue.addSubscriber( - podAllocationDelay, - new ExecutorPodBatchSubscriber( -processUpdatedPod(applicationId), -() => postProcessBatch(applicationId))) +snapshotsStore.addSubscriber(podAllocationDelay) { + processSnapshot(applicationId, _) +} } def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) - private def processUpdatedPod(applicationId: String): PartialFunction[ExecutorPodState, Unit] = { -case running @ PodRunning(_) => - pendingExecutors -= running.execId() - runningExecutors += running.execId() -case completed @ (PodSucceeded(_) | PodDeleted(_) | PodFailed(_)) => - pendingExecutors -= completed.execId() - runningExecutors -= completed.execId() -case _ => - } + private def processSnapshot(applicationId: String, snapshot: ExecutorPodsSnapshot): Unit = { +snapshot.executorPods.keys.foreach { newlyCreatedExecutors -= _ } + +// For all executors we've created against the API but have not seen in a snapshot +// yet - check the current time. If the current time has exceeded some threshold, +// assume that the pod was either never created (the API server never properly +// handled the creation request), or the API server created the pod but we missed +// both the creation and deletion events. In either case, delete the missing pod +// if possible, and mark such a pod to be rescheduled below. +(newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach { execId => --- End diff -- `newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet` is unnecessary as you remove all snapshot pods from `newlyCreatedExecutors` in the line above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user dvogelbacher commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r194176787 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -105,14 +120,15 @@ private[spark] class ExecutorPodsAllocator( .endSpec() .build() kubernetesClient.pods().create(podWithAttachedContainer) -pendingExecutors += newExecutorId +newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() } } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { + // TODO handle edge cases if we end up with more running executors than expected. logDebug("Current number of running executors is equal to the number of requested" + " executors. Not scaling up further.") -} else if (pendingExecutors.nonEmpty) { - logDebug(s"Still waiting for ${pendingExecutors.size} executors to begin running before" + -" requesting for more executors.") +} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) { + logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + --- End diff -- can we make this debug statement more verbose to distinguish between `newlyCreatedExecutors` and `currentPendingExecutors ` ? might help in case anyone ever needs to debug executors never reaching pending state or something like that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org