[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-18 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r196257515
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -154,6 +154,24 @@ private[spark] object Config extends Logging {
   .checkValue(interval => interval > 0, s"Logging interval must be a 
positive time value.")
   .createWithDefaultString("1s")
 
+  val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
+ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
+  .doc("Interval between polls against the Kubernetes API server to 
inspect the " +
+"state of executors.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .checkValue(interval => interval > 0, s"API server polling interval 
must be a" +
+" positive time value.")
+  .createWithDefaultString("30s")
+
+  val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
--- End diff --

This should have been marked with `internal()`, that was an oversight. 
Don't think it's entirely necessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-18 Thread ifilonenko
Github user ifilonenko commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r196249925
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -154,6 +154,24 @@ private[spark] object Config extends Logging {
   .checkValue(interval => interval > 0, s"Logging interval must be a 
positive time value.")
   .createWithDefaultString("1s")
 
+  val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
+ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
+  .doc("Interval between polls against the Kubernetes API server to 
inspect the " +
+"state of executors.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .checkValue(interval => interval > 0, s"API server polling interval 
must be a" +
+" positive time value.")
+  .createWithDefaultString("30s")
+
+  val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
--- End diff --

@mccheah because this is not internal shouldn't we include this in 
`docs/` I don't see why we aren't documenting this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21366


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195569529
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Yeah I see. I guess this is done by ExecutorPodsAllocator as a subscriber. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195567414
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

For example if we changed the `initialDelay` here to stall before the first 
snapshots sync then with the above scheme we'd still try to request executors 
immediately, because the subscriber thread kicks off an allocation round 
immediately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195567079
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

But polling isn't where we start to create executors - that's done on the 
subscriber rounds. Polling here populates the snapshots store, but processing 
the snapshots happens on the subscriber thread(s). Furthermore with the scheme 
proposed above you never have to even poll for snapshots once before we begin 
requesting executors, because the pods allocator subscriber will trigger 
immediately with an empty snapshot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195566124
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent._
+
+import io.fabric8.kubernetes.api.model.Pod
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: 
ScheduledExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
+  private val pollingTasks = mutable.Buffer.empty[Future[_]]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+val newSubscriber = SnapshotsSubscriber(
+new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+subscribers += newSubscriber
+pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
+  toRunnable(() => callSubscriber(newSubscriber)),
--- End diff --

Just tried that and it doesn't work - I think that requires the 
scala-java8-compat module which I don't think is worth pulling in for just this 
case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195565315
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

You could add some comment saying this is where we create executors and by 
what way.
I mean on mesos you start executors when you get offers from agents and 
that is straightforward and makes sense. Here you want to start them ASAP I 
guess so then you can send tasks to them right? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195561927
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

And though we don't allow for this right now, the above would allow 
subscribers to be added midway through to receive the most recent snapshot 
immediately. But again we don't do this right now - we setup all subscribers on 
startup before starting pushing snapshots.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195549746
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Also you can do it at doRequestTotalExecutors right? When do you have the 
information of how many you need?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195542619
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

I see - I think what we actually want is `ExecutorPodsSnapshotStoreImpl` to 
initialize the subscriber with its current snapshot. That creates the semantics 
where the new subscriber will first receive the most up to date state 
immediately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195535296
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Yes you need to trigger the initial creation of executors somehow and yes I 
saw that in the tests, my only concern is that this should be explicit not 
implicit to make code more obvious anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195513995
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-val allocatorExecutor = ThreadUtils
-  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
 val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
   "kubernetes-executor-requests")
+
+val bufferSnapshotsExecutor = ThreadUtils
+  
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer")
+val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor)
+val removedExecutorsCache = CacheBuilder.newBuilder()
+  .expireAfterWrite(3, TimeUnit.MINUTES)
--- End diff --

The cache is only for a best effort attempt to not remove the same executor 
from the scheduler backend multiple times, but at the end of the day even if we 
do accidentally remove multiple times the only noticeable result is noisy logs. 
The scheduler backend properly handles multiple attempts to remove but we'd 
prefer it if we didn't have to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512808
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Not strictly why that's done here but a side-effect I suppose. Really the 
snapshots store should push an initial empty snapshot to all subscribers when 
it starts, and the unit tests do check for that - it's the responsibility of 
the snapshots store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512430
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, Utils}
+
+private[spark] class ExecutorPodsAllocator(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+clock: Clock) extends Logging {
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 6)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Executor IDs that have been requested from Kubernetes but have not 
been detected in any
+  // snapshot yet. Mapped to the timestamp when they were created.
+  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
+
+  def start(applicationId: String): Unit = {
+snapshotsStore.addSubscriber(podAllocationDelay) {
+  onNewSnapshots(applicationId, _)
+}
+  }
+
+  def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
+
+  private def onNewSnapshots(applicationId: String, snapshots: 
Seq[ExecutorPodsSnapshot]): Unit = {
+newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+// For all executors we've created against the API but have not seen 
in a snapshot
+// yet - check the current time. If the current time has exceeded some 
threshold,
+// assume that the pod was either never created (the API server never 
properly
+// handled the creation request), or the API server created the pod 
but we missed
+// both the creation and deletion events. In either case, delete the 
missing pod
+// if possible, and mark such a pod to be rescheduled below.
+newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
+  if (clock.getTimeMillis() - timeCreated > podCreationTimeout) {
+logWarning(s"Executor with id $execId was not detected in the 
Kubernetes" +
+  s" cluster after $podCreationTimeout milliseconds despite the 
fact that a" +
+  " previous allocation attempt tried to create it. The executor 
may have been" +
+  " deleted but the application missed the deletion event.")
+Utils.tryLogNonFatalError {
+  kubernetesClient
+.pods()
+.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+.delete()
--- End diff --

That's handled by the lifecycle manager already, because the lifecycle 
manager looks at what the scheduler backend believes are its executors and 
reconciles them with what's in the snapshot.


---

-
To unsubscribe, e-

[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512219
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-val allocatorExecutor = ThreadUtils
-  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
 val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
   "kubernetes-executor-requests")
+
+val bufferSnapshotsExecutor = ThreadUtils
+  
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer")
+val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor)
+val removedExecutorsCache = CacheBuilder.newBuilder()
+  .expireAfterWrite(3, TimeUnit.MINUTES)
--- End diff --

DOn't think it has to be configurable. Basically we should only receive the 
removed executor events multiple times for a short period of time, then we 
should settle into steady state.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195445788
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

Thanx @ktoso!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195401593
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
--- End diff --

Could you add some debug logging here. In general it would be good to be 
able to trace what is happening in case of a an issue with debug mode, this 
applies to all classes introduced for both watching and polling.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195390942
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Do you start with an empty state to trigger executor creation at the very 
beginning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195382788
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent._
+
+import io.fabric8.kubernetes.api.model.Pod
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: 
ScheduledExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val subscribers = mutable.Buffer.empty[SnapshotsSubscriber]
+  private val pollingTasks = mutable.Buffer.empty[Future[_]]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit = {
+val newSubscriber = SnapshotsSubscriber(
+new LinkedBlockingQueue[ExecutorPodsSnapshot](), onNewSnapshots)
+subscribers += newSubscriber
+pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
+  toRunnable(() => callSubscriber(newSubscriber)),
--- End diff --

toRunnable is not needed with lambdas in Java 8. Just pass there: () => 
callSubscriber(newSubscriber)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195380019
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+// Use a best-effort to track which executors have been removed 
already. It's not generally
+// job-breaking if we remove executors more than once but it's ideal 
if we make an attempt
+// to avoid doing so. Expire cache entries so that this data structure 
doesn't grow beyond
+// bounds.
+removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) {
+
+  import ExecutorPodsLifecycleManager._
+
+  private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+  def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+snapshotsStore.addSubscriber(eventProcessingInterval) {
+  onNewSnapshots(schedulerBackend, _)
+}
+  }
+
+  private def onNewSnapshots(
+  schedulerBackend: KubernetesClusterSchedulerBackend,
+  snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+snapshots.foreach { snapshot =>
+  snapshot.executorPods.foreach { case (execId, state) =>
+state match {
+  case deleted@PodDeleted(pod) =>
+removeExecutorFromSpark(schedulerBackend, deleted, execId)
+execIdsRemovedInThisRound += execId
+  case failed@PodFailed(pod) =>
+onFinalNonDeletedState(failed, execId, schedulerBackend, 
execIdsRemovedInThisRound)
+  case succeeded@PodSucceeded(pod) =>
--- End diff --

same as above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195379975
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+// Use a best-effort to track which executors have been removed 
already. It's not generally
+// job-breaking if we remove executors more than once but it's ideal 
if we make an attempt
+// to avoid doing so. Expire cache entries so that this data structure 
doesn't grow beyond
+// bounds.
+removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) {
+
+  import ExecutorPodsLifecycleManager._
+
+  private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+  def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+snapshotsStore.addSubscriber(eventProcessingInterval) {
+  onNewSnapshots(schedulerBackend, _)
+}
+  }
+
+  private def onNewSnapshots(
+  schedulerBackend: KubernetesClusterSchedulerBackend,
+  snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+snapshots.foreach { snapshot =>
+  snapshot.executorPods.foreach { case (execId, state) =>
+state match {
+  case deleted@PodDeleted(pod) =>
+removeExecutorFromSpark(schedulerBackend, deleted, execId)
+execIdsRemovedInThisRound += execId
+  case failed@PodFailed(pod) =>
--- End diff --

same as above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195379907
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+// Use a best-effort to track which executors have been removed 
already. It's not generally
+// job-breaking if we remove executors more than once but it's ideal 
if we make an attempt
+// to avoid doing so. Expire cache entries so that this data structure 
doesn't grow beyond
+// bounds.
+removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) {
+
+  import ExecutorPodsLifecycleManager._
+
+  private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+  def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+snapshotsStore.addSubscriber(eventProcessingInterval) {
+  onNewSnapshots(schedulerBackend, _)
+}
+  }
+
+  private def onNewSnapshots(
+  schedulerBackend: KubernetesClusterSchedulerBackend,
+  snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
+val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+snapshots.foreach { snapshot =>
+  snapshot.executorPods.foreach { case (execId, state) =>
+state match {
+  case deleted@PodDeleted(pod) =>
--- End diff --

s/succeeded@PodSucceeded(pod)/succeeded@PodSucceeded(_)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195379060
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-val allocatorExecutor = ThreadUtils
-  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
 val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
   "kubernetes-executor-requests")
+
+val bufferSnapshotsExecutor = ThreadUtils
+  
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer")
+val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor)
+val removedExecutorsCache = CacheBuilder.newBuilder()
+  .expireAfterWrite(3, TimeUnit.MINUTES)
--- End diff --

Why 3 minutes? Should this be configurable?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195351851
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent._
+
+import io.fabric8.kubernetes.api.model.Pod
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: 
ScheduledExecutorService)
--- End diff --

Could you add a description of the class here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195350385
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, Utils}
+
+private[spark] class ExecutorPodsAllocator(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+clock: Clock) extends Logging {
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 6)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Executor IDs that have been requested from Kubernetes but have not 
been detected in any
+  // snapshot yet. Mapped to the timestamp when they were created.
+  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
+
+  def start(applicationId: String): Unit = {
+snapshotsStore.addSubscriber(podAllocationDelay) {
+  onNewSnapshots(applicationId, _)
+}
+  }
+
+  def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
+
+  private def onNewSnapshots(applicationId: String, snapshots: 
Seq[ExecutorPodsSnapshot]): Unit = {
+newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+// For all executors we've created against the API but have not seen 
in a snapshot
+// yet - check the current time. If the current time has exceeded some 
threshold,
+// assume that the pod was either never created (the API server never 
properly
+// handled the creation request), or the API server created the pod 
but we missed
+// both the creation and deletion events. In either case, delete the 
missing pod
+// if possible, and mark such a pod to be rescheduled below.
+newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
+  if (clock.getTimeMillis() - timeCreated > podCreationTimeout) {
+logWarning(s"Executor with id $execId was not detected in the 
Kubernetes" +
+  s" cluster after $podCreationTimeout milliseconds despite the 
fact that a" +
+  " previous allocation attempt tried to create it. The executor 
may have been" +
+  " deleted but the application missed the deletion event.")
+Utils.tryLogNonFatalError {
+  kubernetesClient
+.pods()
+.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+.delete()
--- End diff --

Shouldn't deleteFromSpark called here as well? Couldn't be the case that 
the executor exists at a higher level but K8s backend missed it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For addition

[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-13 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195040122
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
--- End diff --

Ok just wanted to verify it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194856403
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

I ended up just removing reactive programming entirely - the buffering is 
implemented manually. Please take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194840955
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

These types are not exposed - they're only implementation details in the 
Kubernetes module. Furthermore the RxJava dependency will be in the Spark 
distribution but is not a dependency pulled in by spark-core.

It sounds like there is some contention with the extra dependency though, 
so should we be considering implementing our own mechanisms from the ground up? 
I think the bottom line question is: can spark-kubernetes, NOT spark-core, pull 
in RxJava?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194839188
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
--- End diff --

There's a shutdown hook that closes the Spark Context and all its dependent 
components, including the `KubernetesClusterSchedulerBackend`. In 
`KubernetesClusterSchedulerBackend` we shut down all of these components. We 
also shut them down with handling caught exceptions. Also all of these thread 
pools are daemon thread pools so they should shut themselves down on JVM exit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread ktoso
Github user ktoso commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194736679
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

Seems some confusion is here?
Akka Streams does not depend on Rx of course, they both alternative 
implementations of Reactive Streams ( http://reactive-streams.org/ ) which have 
been included in JDK9 as `java.util.concurrent.Flow.*` and Akka also implements 
those, but does not require JDK9; you can use JDK8 + RS and if you use JDK9 you 
could use the JDK's types but it's not required.

Anything else I should clarify or review here? For inter-op purposes it 
would be good to not expose on a specific implementation but expose the 
reactive-streams types (`org.reactivestreams.Publisher` etc), but that only 
matters if the types are exposed. As for including dependencies in core Spark 
-- I would expect this to carry quite a bit of implications though don't know 
Spark's rules about it (ofc less dependencies == better for users, since less 
chances to version-clash with libraries they'd use)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194670526
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

> Does Akka streams without JDK 9 depend on ReactiveX?
It depends on reactive streams library so you dont need to bring rx-Java 
in. @ktoso correct me if I am wrong.

> In this regard we are no different from the other custom controllers in 
the Kubernetes ecosystem which have to handle managing large number of pods.
Have an example of a specific controller to get a better understanding?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-12 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194664576
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
--- End diff --

The are a number of such calls, are we sure they will be executed in any 
scenario like an exception?
Are the stop calls bound to some shutdown hook? Is this covered by RX-java? 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194574775
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

> One question is how about performance at scale when you get events from 
hundreds of executors at once which framework should work best? Should we worry 
about this?

One already runs into this problem since we open a Watch that streams all 
events down anyways. In any implementation where we want events to be processed 
at different intervals, there needs to be some buffering or else we choose to 
ignore some events and only look at the most up to date snapshot at the given 
intervals. As discussed in 
https://github.com/apache/spark/pull/21366#discussion_r194181797 we really want 
to process as many events as we get as possible, so we're stuck with buffering 
somewhere, and regardless of the observables or reactive programming framework 
we pick we still have to effectively store `O(E)` items, `E` being the number 
of events. And aside from the buffering we'd need to consider the scale of the 
stream of events flowing from the persistent HTTP connection backing the Watch. 
In this regard we are no different from the other custom controllers in the 
Kubernetes ecosystem which have to handle managing large number of pods.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194572858
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

> f not mistaken Java 9 has introduced the related interfaces, so rx-java 
might not be needed in the future when Spark will update the supported java 
version.

Spark will need to support Java 8 for the foreseeable future. We'll be 
using a Java 8 only compatible solution for awhile.

> Some more scala centric implementations: akka streams a Reactive Streams 
and JDK 9+ java.util.concurrent.Flow-compliant implementation. This does not 
depend on rx-java.
Also there is monix, and RxScala seems outdated. A comparison between monix 
and others here.

Does Akka streams without JDK 9 depend on ReactiveX? If so then we have the 
same dependency problem. As for monix vs. Akka vs. RxJava, think that's an 
implementation detail - at the end of the day the main concern is the 
dependency addition and either way we're adding an external dependency vs. 
implementing the semantics ourselves. If we think monix / Akka is the more 
elegant solution over RxJava we can switch to that, but given how little we're 
actually doing that's actually in the functional programming style I don't 
think the difference between Java and Scala is significant in this particular 
instance.

> From the dependencies already brought in by Spark is there a lib that 
could provide similar functionality, if not based on Reactive stuff some other 
way queues or something?

I didn't see any but I am open to being corrected here. Spark implements an 
event queue but it doesn't suffice for the paradigm of having multiple 
subscribers process snapshots at different intervals.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194560151
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

@felixcheung (sorry for the delayed answer), @mccheah  a few 
thoughts/observations:
a) If not mistaken Java 9 has introduced the related interfaces, so rx-java 
might not be needed in the future.
b) Some more scala centric implementations:  [akka 
streams](https://doc.akka.io/docs/akka/current/stream/stream-integrations.html#integrating-with-reactive-streams)
 instead, a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant 
implementation. This does not depend on rx-java.
Also there is [monix] (https://github.com/monix/monix), and 
(RxScala)[https://github.com/ReactiveX/RxScala] seems outdated. A comparison 
between monix and others 
(here)[https://monix.io/docs/2x/reactive/observable-comparisons.html].
One question is how about performance at scale when you get events from 
hundreds of executors at once which framework should work best? Should we worry 
about this? 
Rx-java is light but also monix might be light at the end of the day or 
akka streams, although its purpose goes beyond observables etc.
c) I built the distro from the PR, so rxjava-2.1.13.jar, 
reactive-streams-1.0.2.jar are added to the jars. Spark core does not seem to 
have any classes in it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194536061
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
--- End diff --

And the internal implementation is pretty different between the test and 
the real versions - the real version uses ReactiveX observables while the fake 
one just uses an in-memory buffer, etc. - it doesn't entirely make sense to 
have the test implementation extend the real implementation. Mocks are too 
verbose because of the complexity of the test implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194535271
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
--- End diff --

Prefer an impl class here because we also implement a test-only version in 
https://github.com/apache/spark/pull/21366/files#diff-ac93ac5a7a34f600bb942c7feb092924


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-11 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194534077
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
--- End diff --

Picky: Having an Impl class is more Java like.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194184310
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+bufferSnapshotsExecutor: ScheduledExecutorService,
+executeSubscriptionsExecutor: ExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val snapshotsObservable = 
PublishSubject.create[ExecutorPodsSnapshot]()
+  private val observedDisposables = mutable.Buffer.empty[Disposable]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (subscriber: ExecutorPodsSnapshot => Unit): Unit = {
+observedDisposables += snapshotsObservable
+  // Group events in the time window given by the caller. These 
buffers are then sent
+  // to the caller's lambda at the given interval, with the pod 
updates that occurred
+  // in that given interval.
+  .buffer(
+processBatchIntervalMillis,
+TimeUnit.MILLISECONDS,
+// For testing - specifically use the given scheduled executor 
service to trigger
+// buffer boundaries. Allows us to inject a deterministic 
scheduler here.
+Schedulers.from(bufferSnapshotsExecutor))
+  // Trigger an event cycle immediately. Not strictly required to be 
fully correct, but
+  // in particular the pod allocator should try to request executors 
immediately instead
+  // of waiting for one pod allocation delay.
+  .startWith(Lists.newArrayList(ExecutorPodsSnapshot()))
+  // Force all triggered events - both the initial event above and the 
buffered ones in
+  // the following time windows - to execute asynchronously to this 
call's thread.
+  .observeOn(Schedulers.from(executeSubscriptionsExecutor))
+  .subscribe(toReactivexConsumer { snapshots: 
java.util.List[ExecutorPodsSnapshot] =>
+Utils.tryLogNonFatalError {
+  snapshots.asScala.foreach(subscriber)
+}
+  })
+  }
+
+  override def stop(): Unit = {
+observedDisposables.foreach(_.dispose())
+snapshotsObservable.onComplete()
+ThreadUtils.shutdown(bufferSnapshotsExecutor)
+ThreadUtils.shutdown(executeSubscriptionsExecutor)
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = 
SNAPSHOT_LOCK.synchronized {
+currentSnapshot = currentSnapshot.withUpdate(updatedPod)
--- End diff --

A brief remark that one still might not be 100% accurate because of 
failures in the information flow (missing events etc) - but for all the events 
we do get we should be trying to handle them all as best we can.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread dvogelbacher
Github user dvogelbacher commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194183023
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+bufferSnapshotsExecutor: ScheduledExecutorService,
+executeSubscriptionsExecutor: ExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val snapshotsObservable = 
PublishSubject.create[ExecutorPodsSnapshot]()
+  private val observedDisposables = mutable.Buffer.empty[Disposable]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (subscriber: ExecutorPodsSnapshot => Unit): Unit = {
+observedDisposables += snapshotsObservable
+  // Group events in the time window given by the caller. These 
buffers are then sent
+  // to the caller's lambda at the given interval, with the pod 
updates that occurred
+  // in that given interval.
+  .buffer(
+processBatchIntervalMillis,
+TimeUnit.MILLISECONDS,
+// For testing - specifically use the given scheduled executor 
service to trigger
+// buffer boundaries. Allows us to inject a deterministic 
scheduler here.
+Schedulers.from(bufferSnapshotsExecutor))
+  // Trigger an event cycle immediately. Not strictly required to be 
fully correct, but
+  // in particular the pod allocator should try to request executors 
immediately instead
+  // of waiting for one pod allocation delay.
+  .startWith(Lists.newArrayList(ExecutorPodsSnapshot()))
+  // Force all triggered events - both the initial event above and the 
buffered ones in
+  // the following time windows - to execute asynchronously to this 
call's thread.
+  .observeOn(Schedulers.from(executeSubscriptionsExecutor))
+  .subscribe(toReactivexConsumer { snapshots: 
java.util.List[ExecutorPodsSnapshot] =>
+Utils.tryLogNonFatalError {
+  snapshots.asScala.foreach(subscriber)
+}
+  })
+  }
+
+  override def stop(): Unit = {
+observedDisposables.foreach(_.dispose())
+snapshotsObservable.onComplete()
+ThreadUtils.shutdown(bufferSnapshotsExecutor)
+ThreadUtils.shutdown(executeSubscriptionsExecutor)
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = 
SNAPSHOT_LOCK.synchronized {
+currentSnapshot = currentSnapshot.withUpdate(updatedPod)
--- End diff --

gotcha, that makes sense


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194181797
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+bufferSnapshotsExecutor: ScheduledExecutorService,
+executeSubscriptionsExecutor: ExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val snapshotsObservable = 
PublishSubject.create[ExecutorPodsSnapshot]()
+  private val observedDisposables = mutable.Buffer.empty[Disposable]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (subscriber: ExecutorPodsSnapshot => Unit): Unit = {
+observedDisposables += snapshotsObservable
+  // Group events in the time window given by the caller. These 
buffers are then sent
+  // to the caller's lambda at the given interval, with the pod 
updates that occurred
+  // in that given interval.
+  .buffer(
+processBatchIntervalMillis,
+TimeUnit.MILLISECONDS,
+// For testing - specifically use the given scheduled executor 
service to trigger
+// buffer boundaries. Allows us to inject a deterministic 
scheduler here.
+Schedulers.from(bufferSnapshotsExecutor))
+  // Trigger an event cycle immediately. Not strictly required to be 
fully correct, but
+  // in particular the pod allocator should try to request executors 
immediately instead
+  // of waiting for one pod allocation delay.
+  .startWith(Lists.newArrayList(ExecutorPodsSnapshot()))
+  // Force all triggered events - both the initial event above and the 
buffered ones in
+  // the following time windows - to execute asynchronously to this 
call's thread.
+  .observeOn(Schedulers.from(executeSubscriptionsExecutor))
+  .subscribe(toReactivexConsumer { snapshots: 
java.util.List[ExecutorPodsSnapshot] =>
+Utils.tryLogNonFatalError {
+  snapshots.asScala.foreach(subscriber)
+}
+  })
+  }
+
+  override def stop(): Unit = {
+observedDisposables.foreach(_.dispose())
+snapshotsObservable.onComplete()
+ThreadUtils.shutdown(bufferSnapshotsExecutor)
+ThreadUtils.shutdown(executeSubscriptionsExecutor)
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = 
SNAPSHOT_LOCK.synchronized {
+currentSnapshot = currentSnapshot.withUpdate(updatedPod)
--- End diff --

I'm pretty sure we want all snapshots. The reason is to get a more accurate 
response in the lifecycle handler. If a pod enters and error state and then 
someone marks it for deletion, you want to at least have the chance to capture 
the error state when sending the executor removal request to the Spark 
scheduler.

For the pods allocator we can do either the latest or all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread dvogelbacher
Github user dvogelbacher commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194180839
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.Cache
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsLifecycleManager(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+// Use a best-effort to track which executors have been removed 
already. It's not generally
+// job-breaking if we remove executors more than once but it's ideal 
if we make an attempt
+// to avoid doing so. Expire cache entries so that this data structure 
doesn't grow beyond
+// bounds.
+removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) {
+
+  import ExecutorPodsLifecycleManager._
+
+  private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
+
+  def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+snapshotsStore.addSubscriber(eventProcessingInterval) {
+  onNextSnapshot(schedulerBackend, _)
+}
+  }
+
+  private def onNextSnapshot(
+  schedulerBackend: KubernetesClusterSchedulerBackend,
+  snapshot: ExecutorPodsSnapshot): Unit = {
+val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
+snapshot.executorPods.foreach { case (execId, state) =>
+  state match {
+case PodDeleted(pod) =>
+  removeExecutorFromSpark(schedulerBackend, pod, execId)
+  execIdsRemovedInThisRound += execId
+case errorOrSucceeded @ (PodFailed(_) | PodSucceeded(_)) =>
+  removeExecutorFromK8s(errorOrSucceeded.pod)
+  removeExecutorFromSpark(schedulerBackend, errorOrSucceeded.pod, 
execId)
+  execIdsRemovedInThisRound += execId
+case _ =>
+  }
+}
+
+// Reconcile the case where Spark claims to know about an executor but 
the corresponding pod
+// is missing from the cluster. This would occur if we miss a deletion 
event and the pod
+// transitions immediately from running io absent.
+(schedulerBackend.getExecutorIds().map(_.toLong).toSet
+  -- snapshot.executorPods.keySet
+  -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
+  if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
+val exitReason = ExecutorExited(
+  UNKNOWN_EXIT_CODE,
+  exitCausedByApp = false,
+  s"The executor with ID $missingExecutorId was not found in the 
cluster but we didn't" +
+s" get a reason why. Marking the executor as failed. The 
executor may have been" +
+s" deleted but the driver missed the deletion event.")
+schedulerBackend.doRemoveExecutor(missingExecutorId.toString, 
exitReason)
+  }
+}
+  }
+
+  private def removeExecutorFromK8s(updatedPod: Pod): Unit = {
+// If deletion failed on a previous try, we can try again if resync 
informs us the pod
+// is still around.
+// Delete as best attempt - duplicate deletes will throw an exception 
but the end state
+// of getting rid of the pod is what matters.
+Utils.tryLogNonFatalError {
+  kubernetesClient
+.pods()
+.withName(updatedPod.getMetadata.g

[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread dvogelbacher
Github user dvogelbacher commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194179846
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
+
+import com.google.common.collect.Lists
+import io.fabric8.kubernetes.api.model.Pod
+import io.reactivex.disposables.Disposable
+import io.reactivex.functions.Consumer
+import io.reactivex.schedulers.Schedulers
+import io.reactivex.subjects.PublishSubject
+import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+bufferSnapshotsExecutor: ScheduledExecutorService,
+executeSubscriptionsExecutor: ExecutorService)
+  extends ExecutorPodsSnapshotsStore {
+
+  private val SNAPSHOT_LOCK = new Object()
+
+  private val snapshotsObservable = 
PublishSubject.create[ExecutorPodsSnapshot]()
+  private val observedDisposables = mutable.Buffer.empty[Disposable]
+
+  @GuardedBy("SNAPSHOT_LOCK")
+  private var currentSnapshot = ExecutorPodsSnapshot()
+
+  override def addSubscriber(
+  processBatchIntervalMillis: Long)
+  (subscriber: ExecutorPodsSnapshot => Unit): Unit = {
+observedDisposables += snapshotsObservable
+  // Group events in the time window given by the caller. These 
buffers are then sent
+  // to the caller's lambda at the given interval, with the pod 
updates that occurred
+  // in that given interval.
+  .buffer(
+processBatchIntervalMillis,
+TimeUnit.MILLISECONDS,
+// For testing - specifically use the given scheduled executor 
service to trigger
+// buffer boundaries. Allows us to inject a deterministic 
scheduler here.
+Schedulers.from(bufferSnapshotsExecutor))
+  // Trigger an event cycle immediately. Not strictly required to be 
fully correct, but
+  // in particular the pod allocator should try to request executors 
immediately instead
+  // of waiting for one pod allocation delay.
+  .startWith(Lists.newArrayList(ExecutorPodsSnapshot()))
+  // Force all triggered events - both the initial event above and the 
buffered ones in
+  // the following time windows - to execute asynchronously to this 
call's thread.
+  .observeOn(Schedulers.from(executeSubscriptionsExecutor))
+  .subscribe(toReactivexConsumer { snapshots: 
java.util.List[ExecutorPodsSnapshot] =>
+Utils.tryLogNonFatalError {
+  snapshots.asScala.foreach(subscriber)
+}
+  })
+  }
+
+  override def stop(): Unit = {
+observedDisposables.foreach(_.dispose())
+snapshotsObservable.onComplete()
+ThreadUtils.shutdown(bufferSnapshotsExecutor)
+ThreadUtils.shutdown(executeSubscriptionsExecutor)
+  }
+
+  override def updatePod(updatedPod: Pod): Unit = 
SNAPSHOT_LOCK.synchronized {
+currentSnapshot = currentSnapshot.withUpdate(updatedPod)
--- End diff --

we only ever need to send the latest snapshot update with this new 
approach, so having the `snapshotsObservable buffer` doesn't make sense 
anymore? I.e., we don't need to send the subscriber all the Snapshots that 
happened during a given interval. Only the last one.
So we can just have a periodic task that takes the latest snapshot and 
sends it to all subscribers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread dvogelbacher
Github user dvogelbacher commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194177422
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
@@ -49,48 +54,58 @@ private[spark] class ExecutorPodsAllocator(
 .withName(kubernetesDriverPodName)
 .get()
 
-  // Use sets of ids instead of counters to be able to handle duplicate 
events.
-
-  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
-  private val pendingExecutors = mutable.Set.empty[Long]
-
-  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
-  // executors that are running. But, here we choose instead to maintain 
all state within this
-  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
-  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
-  // We may need to consider where these perspectives may differ and which 
perspective should
-  // take precedence.
-  private val runningExecutors = mutable.Set.empty[Long]
+  // Executor IDs that have been requested from Kubernetes but have not 
been detected in any
+  // snapshot yet. Mapped to the timestamp when they were created.
+  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
 
   def start(applicationId: String): Unit = {
-eventQueue.addSubscriber(
-  podAllocationDelay,
-  new ExecutorPodBatchSubscriber(
-processUpdatedPod(applicationId),
-() => postProcessBatch(applicationId)))
+snapshotsStore.addSubscriber(podAllocationDelay) {
+  processSnapshot(applicationId, _)
+}
   }
 
   def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
 
-  private def processUpdatedPod(applicationId: String): 
PartialFunction[ExecutorPodState, Unit] = {
-case running @ PodRunning(_) =>
-  pendingExecutors -= running.execId()
-  runningExecutors += running.execId()
-case completed @ (PodSucceeded(_) | PodDeleted(_) | PodFailed(_)) =>
-  pendingExecutors -= completed.execId()
-  runningExecutors -= completed.execId()
-case _ =>
-  }
+  private def processSnapshot(applicationId: String, snapshot: 
ExecutorPodsSnapshot): Unit = {
+snapshot.executorPods.keys.foreach { newlyCreatedExecutors -= _ }
+
+// For all executors we've created against the API but have not seen 
in a snapshot
+// yet - check the current time. If the current time has exceeded some 
threshold,
+// assume that the pod was either never created (the API server never 
properly
+// handled the creation request), or the API server created the pod 
but we missed
+// both the creation and deletion events. In either case, delete the 
missing pod
+// if possible, and mark such a pod to be rescheduled below.
+(newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach 
{ execId =>
--- End diff --

`newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet` is 
unnecessary as you remove all snapshot pods from `newlyCreatedExecutors` in the 
line above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-08 Thread dvogelbacher
Github user dvogelbacher commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r194176787
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
@@ -105,14 +120,15 @@ private[spark] class ExecutorPodsAllocator(
   .endSpec()
   .build()
 kubernetesClient.pods().create(podWithAttachedContainer)
-pendingExecutors += newExecutorId
+newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
   }
 } else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
+  // TODO handle edge cases if we end up with more running executors 
than expected.
   logDebug("Current number of running executors is equal to the number 
of requested" +
 " executors. Not scaling up further.")
-} else if (pendingExecutors.nonEmpty) {
-  logDebug(s"Still waiting for ${pendingExecutors.size} executors to 
begin running before" +
-" requesting for more executors.")
+} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors 
!= 0) {
+  logDebug(s"Still waiting for ${newlyCreatedExecutors.size + 
currentPendingExecutors}" +
--- End diff --

can we make this debug statement more verbose to distinguish between 
`newlyCreatedExecutors` and `currentPendingExecutors ` ? might help in case 
anyone ever needs to debug executors never reaching pending state or something 
like that


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org