[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190769478 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( +kubernetesClient: KubernetesClient, +eventHandler: ExecutorPodsEventHandler, +pollingExecutor: ScheduledExecutorService) { + + private var pollingFuture: Future[_] = null + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null --- End diff -- Done, see below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190762965 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsEventQueue.scala --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.Pod +import scala.collection.mutable + +class DeterministicExecutorPodsEventQueue extends ExecutorPodsEventQueue { + + private val eventBuffer = mutable.Buffer.empty[Pod] + private val subscribers = mutable.Buffer.empty[(Seq[Pod]) => Unit] + + override def addSubscriber + (processBatchIntervalMillis: Long) + (onNextBatch: (Seq[Pod]) => Unit): Unit = { +subscribers += onNextBatch + } + + override def stopProcessingEvents(): Unit = {} + + override def pushPodUpdate(updatedPod: Pod): Unit = eventBuffer += updatedPod --- End diff -- Yup, basically just a live stream of the pod statuses as reported by the API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190755750 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsEventQueue.scala --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.Pod +import scala.collection.mutable + +class DeterministicExecutorPodsEventQueue extends ExecutorPodsEventQueue { + + private val eventBuffer = mutable.Buffer.empty[Pod] + private val subscribers = mutable.Buffer.empty[(Seq[Pod]) => Unit] + + override def addSubscriber + (processBatchIntervalMillis: Long) + (onNextBatch: (Seq[Pod]) => Unit): Unit = { +subscribers += onNextBatch + } + + override def stopProcessingEvents(): Unit = {} + + override def pushPodUpdate(updatedPod: Pod): Unit = eventBuffer += updatedPod --- End diff -- So events are pods themselves, as opposed to some event structure on pods? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190398282 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190389782 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190386369 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190385827 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- We always add to the top level and then in the lower level poms, we reference the dependent modules without listing their versions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190372105 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190367677 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190367420 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190366267 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsWatchEventSource( +eventHandler: ExecutorPodsEventHandler, +kubernetesClient: KubernetesClient) extends Logging { + + private var watchConnection: Closeable = null --- End diff -- In general `start` and `stop` actions happen on their own threads; i.e. there shouldn't be concurrent threads trying to `start` and `stop` any component at the same time. So I think it's fine to make all data structures that are only accessed in `start` and `stop` not thread safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190365981 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190365332 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +ev
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190356134 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190356590 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( +kubernetesClient: KubernetesClient, +eventHandler: ExecutorPodsEventHandler, +pollingExecutor: ScheduledExecutorService) { + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS) --- End diff -- Agreed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190354568 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190356761 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( +kubernetesClient: KubernetesClient, +eventHandler: ExecutorPodsEventHandler, +pollingExecutor: ScheduledExecutorService) { + + private var pollingFuture: Future[_] = null + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null --- End diff -- Ditto. the `pollingExecutor` should be shutdown properly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190349318 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- Why adding this to the top level pom? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190350967 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190350563 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( --- End diff -- The `eventProcessorExecutor` should be shutdown properly. Or you can use https://google.github.io/guava/releases/17.0/api/docs/com/google/common/util/concurrent/MoreExecutors.html#getExitingScheduledExecutorService(java.util.concurrent.ScheduledThreadPoolExecutor) to avoid having to manually shutdown it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addition
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190358300 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsWatchEventSource( +eventHandler: ExecutorPodsEventHandler, +kubernetesClient: KubernetesClient) extends Logging { + + private var watchConnection: Closeable = null --- End diff -- Ditto for `pollingFuture` above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190358018 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsWatchEventSource( +eventHandler: ExecutorPodsEventHandler, +kubernetesClient: KubernetesClient) extends Logging { + + private var watchConnection: Closeable = null --- End diff -- Is there a equivalent to Java's `volatile` in Scala? If so, `watchConnection` should be `volatile` so your `watchConnection == null` check below is safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189739169 --- Diff: pom.xml --- @@ -150,6 +150,7 @@ 4.5.4 4.4.8 +3.0.1 --- End diff -- Noted, will remove in the next push. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189735073 --- Diff: pom.xml --- @@ -150,6 +150,7 @@ 4.5.4 4.4.8 +3.0.1 --- End diff -- My take is that the performance is probably not worth the additional dependency. I also noticed that the trove dep is LGPL, which is considered incompatible with Apache license. Although I believe this is not a show-stopper with respect to "containerized" dependencies, it probably is for a direct dependency in the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189642632 --- Diff: pom.xml --- @@ -150,6 +150,7 @@ 4.5.4 4.4.8 +3.0.1 --- End diff -- These are data structures optimized for storing primitives. We could use standard Scala here functionally speaking. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user erikerlandson commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189435162 --- Diff: pom.xml --- @@ -150,6 +150,7 @@ 4.5.4 4.4.8 +3.0.1 --- End diff -- it looks like this dep is being taken on for a couple data structures: are these important or could they be replaced with scala Array and HashMap? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189401135 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( +kubernetesClient: KubernetesClient, +eventHandler: ExecutorPodsEventHandler, +pollingExecutor: ScheduledExecutorService) { + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS) --- End diff -- Should make these and other intervals like it configurable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189400912 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class ExecutorPodsPollingEventSource( --- End diff -- It's noteworthy that the resync polls can also be done in `ExecutorPodsEventHandler#processEvents`. The reason we don't is because we probably want the resync polls to occur on a different interval than the event handling passes. You may, for example, ask for the event handler to trigger very frequently so that pod updates are dealt with promptly. But you don't want to be polling the API server every 5 seconds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189400286 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import gnu.trove.list.array.TLongArrayList +import gnu.trove.set.hash.TLongHashSet +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = new TLongHashSet() + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes a scheduler is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = new TLongHashSet() + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +val eventProcessor = new Runnable { + override def run(): Unit = processEvents(applicationId, schedulerBackend) +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, 5L, TimeUnit.SECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) + eventQueue.size()) +eventQueue.drainTo(currentEvents) +currentEvents.asScala.flatten.foreach { updatedPod => + val execId = updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong + val podPhase = updatedPod.getStatus.getPhase.toLowerCase + if (is
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r189399432 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import gnu.trove.list.array.TLongArrayList +import gnu.trove.set.hash.TLongHashSet +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = new TLongHashSet() + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes a scheduler is running is dictated by the K8s API rather than Spark's RPC events. --- End diff -- believes an executor is running* --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
GitHub user mccheah opened a pull request: https://github.com/apache/spark/pull/21366 [SPARK-24248][K8S][WIP] Use the Kubernetes API to populate an event queue for scheduling ## What changes were proposed in this pull request? Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables. We can do better here by: 1. Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to implement an event queue which is populated by two sources: a watch connection, and a periodic poller. Controllers typically use both mechanisms for redundancy; the watch connection may drop, so the periodic polling serves as a backup. Both sources write pod updates to a single event queue and then a processor periodically processes the current state of pods as reported by the two sources. 2. Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the event queue, which has predictable read/write patterns and is more or less just a local up-to-date cache of the cluster's status. ## How was this patch tested? Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/palantir/spark event-queue-driven-scheduling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21366.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21366 commit 310263c253a8c4a3748cab5b5a7698e076695cd6 Author: mcheah Date: 2018-05-18T20:39:47Z [SPARK-24248][K8S] Use the Kubernetes API to populate an event queue for scheduling Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore one had to keep track of multiple hash tables. We can do better here by: (1) Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to implement an event queue which is populated by two sources: a watch connection, and a periodic poller. Controllers typically use both mechanisms for redundancy; the watch connection may drop, so the periodic polling serves as a backup. Both sources write pod updates to a single event queue and then a processor periodically processes the current state of pods as reported by the two sources. (2) Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the event queue, which has predictable read/write patterns and is more or less just a local up-to-date cache of the cluster's status. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org