[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157640250
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore
  */
 private[spark] abstract class LiveEntity {
 
-  var lastWriteTime = 0L
+  var lastWriteTime = -1L
 
-  def write(store: KVStore, now: Long): Unit = {
-store.write(doUpdate())
+  def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean 
= false): Unit = {
+// Always check triggers on the first write, since adding an element 
to the store may
+// cause the maximum count for the element type to be exceeded.
--- End diff --

Multiple jobs, multiple stages, multiple tasks, etc, etc, etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157640073
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore
  */
 private[spark] abstract class LiveEntity {
 
-  var lastWriteTime = 0L
+  var lastWriteTime = -1L
 
-  def write(store: KVStore, now: Long): Unit = {
-store.write(doUpdate())
+  def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean 
= false): Unit = {
+// Always check triggers on the first write, since adding an element 
to the store may
+// cause the maximum count for the element type to be exceeded.
--- End diff --

hmm, for the first write, how can it trigger maximum count?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639788
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import com.google.common.util.concurrent.MoreExecutors
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on a separate thread by default; they 
can be forced to run on
+ * the calling thread by setting the `ASYNC_TRACKING_ENABLED` 
configuration to `false`.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")
+  } else {
+MoreExecutors.sameThreadExecutor()
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously. The task will run 
on the calling thread if
+   * `ASYNC_TRACKING_ENABLED` is `false`.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, optionally checking for whether to 
fire triggers. */
  

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639499
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
+  // Keep the active executor count as a separate variable to avoid having 
to do synchronization
+  // around liveExecutors.
+  @volatile private var activeExecutorCount = 0
 
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
-case SparkListenerLogStart(version) => sparkVersion = version
-case _ =>
+  kvstore.addTrigger(classOf[ExecutorSummaryWrapper], 
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
+{ count => cleanupExecutors(count) }
+
+  kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) 
{ count =>
+cleanupJobs(count)
+  }
+
+  kvstore.addTrigger(classOf[StageDataWrapper], 
conf.get(MAX_RETAINED_STAGES)) { count =>
+cleanupStages(count)
+  }
+
+  kvstore.onFlush {
+if (!live) {
+  flush()
--- End diff --

Because the store is only closed on live applications when the context is 
shut down. So there's no more UI for you to see this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639533
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import com.google.common.util.concurrent.MoreExecutors
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on a separate thread by default; they 
can be forced to run on
+ * the calling thread by setting the `ASYNC_TRACKING_ENABLED` 
configuration to `false`.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
--- End diff --

Man, this has been pushed... I'd appreciate reviews more before they are 
pushed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639459
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import com.google.common.util.concurrent.MoreExecutors
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on a separate thread by default; they 
can be forced to run on
+ * the calling thread by setting the `ASYNC_TRACKING_ENABLED` 
configuration to `false`.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
--- End diff --

use a mutable map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639397
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -727,8 +769,8 @@ private[spark] class AppStatusListener(
 }
   }
 
-  private def update(entity: LiveEntity, now: Long): Unit = {
-entity.write(kvstore, now)
+  private def update(entity: LiveEntity, now: Long, last: Boolean = 
false): Unit = {
--- End diff --

maybe `checkTriggers` is better than `last`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157639162
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
+  // Keep the active executor count as a separate variable to avoid having 
to do synchronization
+  // around liveExecutors.
+  @volatile private var activeExecutorCount = 0
 
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
-case SparkListenerLogStart(version) => sparkVersion = version
-case _ =>
+  kvstore.addTrigger(classOf[ExecutorSummaryWrapper], 
conf.get(MAX_RETAINED_DEAD_EXECUTORS))
+{ count => cleanupExecutors(count) }
+
+  kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) 
{ count =>
+cleanupJobs(count)
+  }
+
+  kvstore.addTrigger(classOf[StageDataWrapper], 
conf.get(MAX_RETAINED_STAGES)) { count =>
+cleanupStages(count)
+  }
+
+  kvstore.onFlush {
+if (!live) {
+  flush()
--- End diff --

hm, why only flush for history server?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19751


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r157046378
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -318,24 +319,23 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 (new InMemoryStore(), true)
--- End diff --

not related to this change, comment is really for `val _replay = 
!path.isDirectory()` but github won't let me put comment there ...

I know we discussed this when you made this change, but still I was 
confused reading this bit of code on replay.  Maybe could you just add a 
comment above that line like "the kvstore is deleted when we decide that the 
loaded data is stale -- see LoadedAppUI for a more extensive discussion of the 
lifecycle".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156813342
  
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging {
 db
   }
 
+  /** Turns a KVStoreView into a Scala sequence, applying a filter. */
+  def viewToSeq[T](
--- End diff --

I create a list explicitly to avoid consistency issues when deleting these 
elements. If I had an iterator instead, and I then called `kvstore.delete`, you 
could get a `ConcurrentModificationException`.

Since the cleanup code deletes more than necessary to just respect the 
limit (to avoid having to do this every time you write something), hopefully 
the cost is amortized a little.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156811638
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
 }
   }
 
+  test("eviction of old data") {
+val testConf = conf.clone()
+  .set(MAX_RETAINED_JOBS, 2)
+  .set(MAX_RETAINED_STAGES, 2)
+  .set(MAX_RETAINED_TASKS_PER_STAGE, 2)
+  .set(MAX_RETAINED_DEAD_EXECUTORS, 1)
+val listener = new AppStatusListener(store, testConf, true)
+
+// Start 3 jobs, all should be kept. Stop one, it should be evicted.
+time += 1
+listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
+listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
+listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
+assert(store.count(classOf[JobDataWrapper]) === 3)
+
+time += 1
+listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+assert(store.count(classOf[JobDataWrapper]) === 2)
+intercept[NoSuchElementException] {
+  store.read(classOf[JobDataWrapper], 2)
+}
+
+// Start 3 stages, all should be kept. Stop 2 of them, the oldest 
stopped one should be
+// deleted. Start a new attempt of the second stopped one, and verify 
that the stage graph
--- End diff --

there is no DAG here, the test controls what "oldest" means. In this case 
"oldest" = "first stage in the list", which is also "smallest id", which is the 
actual behavior of the listener.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156809149
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -772,4 +813,118 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def cleanupExecutors(count: Long): Unit = {
+// Because the limit is on the number of *dead* executors, we need to 
calculate whether
+// there are actually enough dead executors to be deleted.
+val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
+val dead = count - activeExecutorCount
+
+if (dead > threshold) {
+  val countToDelete = calculateNumberToRemove(dead, threshold)
+  val toDelete = 
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
+.max(countToDelete).first(false).last(false).asScala.toSeq
+  toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
+}
+  }
+
+  private def cleanupJobs(count: Long): Unit = {
+val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_JOBS))
+if (countToDelete <= 0L) {
+  return
+}
+
+val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
+countToDelete.toInt) { j =>
+  j.info.status != JobExecutionStatus.RUNNING && j.info.status != 
JobExecutionStatus.UNKNOWN
+}
+toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
+  }
+
+  private def cleanupStages(count: Long): Unit = {
+val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_STAGES))
+if (countToDelete <= 0L) {
+  return
+}
+
+val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
+countToDelete.toInt) { s =>
+  s.info.status != v1.StageStatus.ACTIVE && s.info.status != 
v1.StageStatus.PENDING
+}
+
+stages.foreach { s =>
+  val key = s.id
+  kvstore.delete(s.getClass(), key)
+
+  val execSummaries = 
kvstore.view(classOf[ExecutorStageSummaryWrapper])
+.index("stage")
+.first(key)
+.last(key)
+.asScala
+.toSeq
+  execSummaries.foreach { e =>
+kvstore.delete(e.getClass(), e.id)
+  }
+
+  val tasks = kvstore.view(classOf[TaskDataWrapper])
+.index("stage")
+.first(key)
+.last(key)
+.asScala
+
+  tasks.foreach { t =>
+kvstore.delete(t.getClass(), t.info.taskId)
+  }
+
+  // Check whether there are remaining attempts for the same stage. If 
there aren't, then
+  // also delete the RDD graph data.
+  val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
+.index("stageId")
+.first(s.stageId)
+.last(s.stageId)
+.closeableIterator()
+
+  val hasMoreAttempts = try {
+remainingAttempts.asScala.exists { other =>
+  other.info.attemptId != s.info.attemptId
+}
+  } finally {
+remainingAttempts.close()
+  }
+
+  if (!hasMoreAttempts) {
+kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
+  }
+}
+  }
+
+  private def cleanupTasks(stage: LiveStage): Unit = {
+val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), 
maxTasksPerStage)
+if (countToDelete > 0L) {
+  val stageKey = Array(stage.info.stageId, stage.info.attemptId)
+  val view = 
kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
+.last(stageKey)
+
+  // On live applications, try to delete finished tasks only; when in 
the SHS, treat all
+  // tasks as the same.
+  val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
+!live || t.info.status != TaskState.RUNNING.toString()
--- End diff --

The old code deletes tasks in the order they arrive; it would be expensive 
to do that here since it would involve sorting the task list (cheap for disk 
store, expensive for in-memory).

I can keep the same filter behavior for both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156808475
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, optionally 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156775928
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, optionally 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156718870
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -772,4 +813,118 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def cleanupExecutors(count: Long): Unit = {
+// Because the limit is on the number of *dead* executors, we need to 
calculate whether
+// there are actually enough dead executors to be deleted.
+val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
+val dead = count - activeExecutorCount
+
+if (dead > threshold) {
+  val countToDelete = calculateNumberToRemove(dead, threshold)
+  val toDelete = 
kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
+.max(countToDelete).first(false).last(false).asScala.toSeq
+  toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
+}
+  }
+
+  private def cleanupJobs(count: Long): Unit = {
+val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_JOBS))
+if (countToDelete <= 0L) {
+  return
+}
+
+val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
+countToDelete.toInt) { j =>
+  j.info.status != JobExecutionStatus.RUNNING && j.info.status != 
JobExecutionStatus.UNKNOWN
+}
+toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
+  }
+
+  private def cleanupStages(count: Long): Unit = {
+val countToDelete = calculateNumberToRemove(count, 
conf.get(MAX_RETAINED_STAGES))
+if (countToDelete <= 0L) {
+  return
+}
+
+val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
+countToDelete.toInt) { s =>
+  s.info.status != v1.StageStatus.ACTIVE && s.info.status != 
v1.StageStatus.PENDING
+}
+
+stages.foreach { s =>
+  val key = s.id
+  kvstore.delete(s.getClass(), key)
+
+  val execSummaries = 
kvstore.view(classOf[ExecutorStageSummaryWrapper])
+.index("stage")
+.first(key)
+.last(key)
+.asScala
+.toSeq
+  execSummaries.foreach { e =>
+kvstore.delete(e.getClass(), e.id)
+  }
+
+  val tasks = kvstore.view(classOf[TaskDataWrapper])
+.index("stage")
+.first(key)
+.last(key)
+.asScala
+
+  tasks.foreach { t =>
+kvstore.delete(t.getClass(), t.info.taskId)
+  }
+
+  // Check whether there are remaining attempts for the same stage. If 
there aren't, then
+  // also delete the RDD graph data.
+  val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
+.index("stageId")
+.first(s.stageId)
+.last(s.stageId)
+.closeableIterator()
+
+  val hasMoreAttempts = try {
+remainingAttempts.asScala.exists { other =>
+  other.info.attemptId != s.info.attemptId
+}
+  } finally {
+remainingAttempts.close()
+  }
+
+  if (!hasMoreAttempts) {
+kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
+  }
+}
+  }
+
+  private def cleanupTasks(stage: LiveStage): Unit = {
+val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), 
maxTasksPerStage)
+if (countToDelete > 0L) {
+  val stageKey = Array(stage.info.stageId, stage.info.attemptId)
+  val view = 
kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
+.last(stageKey)
+
+  // On live applications, try to delete finished tasks only; when in 
the SHS, treat all
+  // tasks as the same.
+  val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t =>
+!live || t.info.status != TaskState.RUNNING.toString()
--- End diff --

in the SHS, wouldn't you still prefer to delete finished tasks over live 
ones? in all cases, should you really just try to delete finished tasks first, 
but still delete running tasks if need be?

I don't see any filter like this in the old code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156732445
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -318,24 +319,23 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 (new InMemoryStore(), true)
 }
 
-if (needReplay) {
+val trackingStore = new ElementTrackingStore(kvstore, conf)
+val listener = if (needReplay) {
--- End diff --

`listener` is unused 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156706018
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -40,8 +41,10 @@ private[spark] abstract class LiveEntity {
 
   var lastWriteTime = 0L
--- End diff --

minor: can the initial value be -1 instead?  doesn't matter right now, but 
often in tests we use a manual clock starting at time 0.  That would cause 
problems w/ this default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156730008
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
 }
   }
 
+  test("eviction of old data") {
+val testConf = conf.clone()
+  .set(MAX_RETAINED_JOBS, 2)
+  .set(MAX_RETAINED_STAGES, 2)
+  .set(MAX_RETAINED_TASKS_PER_STAGE, 2)
+  .set(MAX_RETAINED_DEAD_EXECUTORS, 1)
+val listener = new AppStatusListener(store, testConf, true)
+
+// Start 3 jobs, all should be kept. Stop one, it should be evicted.
+time += 1
+listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
+listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
+listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
+assert(store.count(classOf[JobDataWrapper]) === 3)
+
+time += 1
+listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+assert(store.count(classOf[JobDataWrapper]) === 2)
+intercept[NoSuchElementException] {
+  store.read(classOf[JobDataWrapper], 2)
+}
+
+// Start 3 stages, all should be kept. Stop 2 of them, the oldest 
stopped one should be
+// deleted. Start a new attempt of the second stopped one, and verify 
that the stage graph
--- End diff --

oldest meaning smallest id? or ordered they are submitted?  with a 
non-linear stage DAG, the ordering of ids, start-time, & end-time can be 
ordered arbitrarily.  Ids will correspond to submission order, but then stage 
retries complicates that.  

I guess I'm just trying to make sure I understand how the kvstore works, 
and if there is some important part I'm missing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156716551
  
--- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala ---
@@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging {
 db
   }
 
+  /** Turns a KVStoreView into a Scala sequence, applying a filter. */
+  def viewToSeq[T](
--- End diff --

looks like you don't actually need a sequence at all in any of the call 
sites, you could just use an iterator.  I'm thinking about the price of that 
say if you're cleaning up 100k tasks repeatedly.

This does give you a nice spot to include `iter.close()`, but I think you 
could change this to `foreachWithMaxFilterClose` or something to avoid ever 
creating the list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156710644
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, optionally 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156709663
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
--- End diff --

they are actually run in a different thread, right?  (comment on 
`addTrigger` looks correct)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-12-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r156711461
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -772,4 +813,118 @@ private[spark] class AppStatusListener(
 }
   }
 
+  private def cleanupExecutors(count: Long): Unit = {
+// Because the limit is on the number of *dead* executors, we need to 
calculate whether
+// there are actually enough dead executors to be deleted.
+val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
+val dead = count - activeExecutorCount
--- End diff --

KVStore has this:

```java
  /**
   * Returns the number of items of the given type which match the given 
indexed value.
   */
  long count(Class type, String index, Object indexedValue) throws 
Exception;
```

so with an api change you could get the right number directly from the 
store.  (though this conflicts with my other comment about not using 
`kvstore.count()` at all in the trigger, which I think is more important.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r153939785
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -727,8 +769,8 @@ private[spark] class AppStatusListener(
 }
   }
 
-  private def update(entity: LiveEntity, now: Long): Unit = {
-entity.write(kvstore, now)
+  private def update(entity: LiveEntity, now: Long, last: Boolean = 
false): Unit = {
--- End diff --

I prefer how the current version reads on the call site, e.g.:

update(exec, now, last = true)

Also, Spark generally avoids Java-beans-style prefixes in Scala code (like 
"is" or "get").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151607724
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -727,8 +769,8 @@ private[spark] class AppStatusListener(
 }
   }
 
-  private def update(entity: LiveEntity, now: Long): Unit = {
-entity.write(kvstore, now)
+  private def update(entity: LiveEntity, now: Long, last: Boolean = 
false): Unit = {
--- End diff --

nit: maybe change `last` to `isLast` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151607855
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -40,8 +41,8 @@ private[spark] abstract class LiveEntity {
 
   var lastWriteTime = 0L
 
-  def write(store: KVStore, now: Long): Unit = {
-store.write(doUpdate())
+  def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean 
= false): Unit = {
+store.write(doUpdate(), checkTriggers || lastWriteTime == 0L)
--- End diff --

can you specify why does it check triggers on the first write?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151603481
  
--- Diff: 
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
@@ -93,9 +93,9 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
   // number of non-active jobs and stages (there is no limit for active 
jobs and stages):
 
-  val retainedStages = conf.getInt("spark.ui.retainedStages", 
SparkUI.DEFAULT_RETAINED_STAGES)
-  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
SparkUI.DEFAULT_RETAINED_JOBS)
-  val retainedTasks = conf.get(UI_RETAINED_TASKS)
+  val retainedStages = conf.getInt("spark.ui.retainedStages", 1000)
--- End diff --

This class is being removed in a separate PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151324714
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151445965
  
--- Diff: 
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
@@ -93,9 +93,9 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
   // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
   // number of non-active jobs and stages (there is no limit for active 
jobs and stages):
 
-  val retainedStages = conf.getInt("spark.ui.retainedStages", 
SparkUI.DEFAULT_RETAINED_STAGES)
-  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
SparkUI.DEFAULT_RETAINED_JOBS)
-  val retainedTasks = conf.get(UI_RETAINED_TASKS)
+  val retainedStages = conf.getInt("spark.ui.retainedStages", 1000)
--- End diff --

Why use hard code here? Maybe make the configurations in `config.scala` 
public, so that we don't need to write the default values in two places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151510043
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, optionally 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-15 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19751#discussion_r151321285
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
+
+/**
+ * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
+ * actions once they reach a threshold. This allows writers, for example, 
to control how much data
+ * is stored by potentially deleting old data as new data is added.
+ *
+ * This store is used when populating data either from a live UI or an 
event log. On top of firing
+ * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
+ *
+ * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
+ *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
+ * - a generic flush mechanism so that listeners can be notified about 
when they should flush
+ *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
+ *
+ * The configured triggers are run on the same thread that triggered the 
write, after the write
+ * has completed.
+ */
+private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
+
+  import config._
+
+  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
+  private val flushTriggers = new ListBuffer[() => Unit]()
+  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
+  } else {
+None
+  }
+
+  @volatile private var stopped = false
+
+  /**
+   * Register a trigger that will be fired once the number of elements of 
a given type reaches
+   * the given threshold.
+   *
+   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
+   * than would be allowed on the main threads populating the store.
+   *
+   * @param klass The type to monitor.
+   * @param threshold The number of elements that should trigger the 
action.
+   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
+   *   of elements of the registered type currently known to 
be in the store.
+   */
+  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
+val existing = triggers.getOrElse(klass, Seq())
+triggers(klass) = existing :+ Trigger(threshold, action)
+  }
+
+  /**
+   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
+   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
+   * in-progress applications through the SHS.
+   *
+   * Flush triggers are called synchronously in the same thread that is 
closing the store.
+   */
+  def onFlush(action: => Unit): Unit = {
+flushTriggers += { () => action }
+  }
+
+  /**
+   * Enqueues an action to be executed asynchronously.
+   */
+  def doAsync(fn: => Unit): Unit = {
+executor match {
+  case Some(exec) =>
+exec.submit(new Runnable() {
+  override def run(): Unit = Utils.tryLog { fn }
+})
+
+  case _ =>
+fn
+}
+  }
+
+  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
+
+  override def write(value: Any): Unit = store.write(value)
+
+  /** Write an element to the store, 

[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...

2017-11-14 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/19751

[SPARK-20653][core] Add cleaning of old elements from the status store.

This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-20653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19751.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19751


commit 8c346a148d7be78b0f53aadb9c8ca78098b0ea6c
Author: Marcelo Vanzin 
Date:   2017-04-18T20:38:10Z

[SPARK-20653][core] Add cleaning of old elements from the status store.

This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org