[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157640250 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { - var lastWriteTime = 0L + var lastWriteTime = -1L - def write(store: KVStore, now: Long): Unit = { -store.write(doUpdate()) + def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { +// Always check triggers on the first write, since adding an element to the store may +// cause the maximum count for the element type to be exceeded. --- End diff -- Multiple jobs, multiple stages, multiple tasks, etc, etc, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157640073 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] abstract class LiveEntity { - var lastWriteTime = 0L + var lastWriteTime = -1L - def write(store: KVStore, now: Long): Unit = { -store.write(doUpdate()) + def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { +// Always check triggers on the first write, since adding an element to the store may +// cause the maximum count for the element type to be exceeded. --- End diff -- hmm, for the first write, how can it trigger maximum count? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639788 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import com.google.common.util.concurrent.MoreExecutors + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on a separate thread by default; they can be forced to run on + * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") + } else { +MoreExecutors.sameThreadExecutor() + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. The task will run on the calling thread if + * `ASYNC_TRACKING_ENABLED` is `false`. + */ + def doAsync(fn: => Unit): Unit = { +executor.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally checking for whether to fire triggers. */
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639499 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -68,10 +69,25 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + // Keep the active executor count as a separate variable to avoid having to do synchronization + // around liveExecutors. + @volatile private var activeExecutorCount = 0 - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { -case SparkListenerLogStart(version) => sparkVersion = version -case _ => + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) +{ count => cleanupExecutors(count) } + + kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count => +cleanupJobs(count) + } + + kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count => +cleanupStages(count) + } + + kvstore.onFlush { +if (!live) { + flush() --- End diff -- Because the store is only closed on live applications when the context is shut down. So there's no more UI for you to see this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639533 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import com.google.common.util.concurrent.MoreExecutors + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on a separate thread by default; they can be forced to run on + * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() --- End diff -- Man, this has been pushed... I'd appreciate reviews more before they are pushed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639459 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import com.google.common.util.concurrent.MoreExecutors + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on a separate thread by default; they can be forced to run on + * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() --- End diff -- use a mutable map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639397 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -727,8 +769,8 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long): Unit = { -entity.write(kvstore, now) + private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { --- End diff -- maybe `checkTriggers` is better than `last`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157639162 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -68,10 +69,25 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + // Keep the active executor count as a separate variable to avoid having to do synchronization + // around liveExecutors. + @volatile private var activeExecutorCount = 0 - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { -case SparkListenerLogStart(version) => sparkVersion = version -case _ => + kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS)) +{ count => cleanupExecutors(count) } + + kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count => +cleanupJobs(count) + } + + kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count => +cleanupStages(count) + } + + kvstore.onFlush { +if (!live) { + flush() --- End diff -- hm, why only flush for history server? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19751 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r157046378 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -318,24 +319,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) (new InMemoryStore(), true) --- End diff -- not related to this change, comment is really for `val _replay = !path.isDirectory()` but github won't let me put comment there ... I know we discussed this when you made this change, but still I was confused reading this bit of code on replay. Maybe could you just add a comment above that line like "the kvstore is deleted when we decide that the loaded data is stale -- see LoadedAppUI for a more extensive discussion of the lifecycle". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156813342 --- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala --- @@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging { db } + /** Turns a KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( --- End diff -- I create a list explicitly to avoid consistency issues when deleting these elements. If I had an iterator instead, and I then called `kvstore.delete`, you could get a `ConcurrentModificationException`. Since the cleanup code deletes more than necessary to just respect the limit (to avoid having to do this every time you write something), hopefully the cost is amortized a little. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156811638 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala --- @@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("eviction of old data") { +val testConf = conf.clone() + .set(MAX_RETAINED_JOBS, 2) + .set(MAX_RETAINED_STAGES, 2) + .set(MAX_RETAINED_TASKS_PER_STAGE, 2) + .set(MAX_RETAINED_DEAD_EXECUTORS, 1) +val listener = new AppStatusListener(store, testConf, true) + +// Start 3 jobs, all should be kept. Stop one, it should be evicted. +time += 1 +listener.onJobStart(SparkListenerJobStart(1, time, Nil, null)) +listener.onJobStart(SparkListenerJobStart(2, time, Nil, null)) +listener.onJobStart(SparkListenerJobStart(3, time, Nil, null)) +assert(store.count(classOf[JobDataWrapper]) === 3) + +time += 1 +listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) +assert(store.count(classOf[JobDataWrapper]) === 2) +intercept[NoSuchElementException] { + store.read(classOf[JobDataWrapper], 2) +} + +// Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be +// deleted. Start a new attempt of the second stopped one, and verify that the stage graph --- End diff -- there is no DAG here, the test controls what "oldest" means. In this case "oldest" = "first stage in the list", which is also "smallest id", which is the actual behavior of the listener. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156809149 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -772,4 +813,118 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { +// Because the limit is on the number of *dead* executors, we need to calculate whether +// there are actually enough dead executors to be deleted. +val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) +val dead = count - activeExecutorCount + +if (dead > threshold) { + val countToDelete = calculateNumberToRemove(dead, threshold) + val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") +.max(countToDelete).first(false).last(false).asScala.toSeq + toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } +} + } + + private def cleanupJobs(count: Long): Unit = { +val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) +if (countToDelete <= 0L) { + return +} + +val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), +countToDelete.toInt) { j => + j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN +} +toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } + } + + private def cleanupStages(count: Long): Unit = { +val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) +if (countToDelete <= 0L) { + return +} + +val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), +countToDelete.toInt) { s => + s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING +} + +stages.foreach { s => + val key = s.id + kvstore.delete(s.getClass(), key) + + val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper]) +.index("stage") +.first(key) +.last(key) +.asScala +.toSeq + execSummaries.foreach { e => +kvstore.delete(e.getClass(), e.id) + } + + val tasks = kvstore.view(classOf[TaskDataWrapper]) +.index("stage") +.first(key) +.last(key) +.asScala + + tasks.foreach { t => +kvstore.delete(t.getClass(), t.info.taskId) + } + + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) +.index("stageId") +.first(s.stageId) +.last(s.stageId) +.closeableIterator() + + val hasMoreAttempts = try { +remainingAttempts.asScala.exists { other => + other.info.attemptId != s.info.attemptId +} + } finally { +remainingAttempts.close() + } + + if (!hasMoreAttempts) { +kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } +} + } + + private def cleanupTasks(stage: LiveStage): Unit = { +val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage) +if (countToDelete > 0L) { + val stageKey = Array(stage.info.stageId, stage.info.attemptId) + val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) +.last(stageKey) + + // On live applications, try to delete finished tasks only; when in the SHS, treat all + // tasks as the same. + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => +!live || t.info.status != TaskState.RUNNING.toString() --- End diff -- The old code deletes tasks in the order they arrive; it would be expensive to do that here since it would involve sorting the task list (cheap for disk store, expensive for in-memory). I can keep the same filter behavior for both. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156808475 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156775928 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156718870 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -772,4 +813,118 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { +// Because the limit is on the number of *dead* executors, we need to calculate whether +// there are actually enough dead executors to be deleted. +val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) +val dead = count - activeExecutorCount + +if (dead > threshold) { + val countToDelete = calculateNumberToRemove(dead, threshold) + val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") +.max(countToDelete).first(false).last(false).asScala.toSeq + toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } +} + } + + private def cleanupJobs(count: Long): Unit = { +val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) +if (countToDelete <= 0L) { + return +} + +val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), +countToDelete.toInt) { j => + j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN +} +toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } + } + + private def cleanupStages(count: Long): Unit = { +val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) +if (countToDelete <= 0L) { + return +} + +val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), +countToDelete.toInt) { s => + s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING +} + +stages.foreach { s => + val key = s.id + kvstore.delete(s.getClass(), key) + + val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper]) +.index("stage") +.first(key) +.last(key) +.asScala +.toSeq + execSummaries.foreach { e => +kvstore.delete(e.getClass(), e.id) + } + + val tasks = kvstore.view(classOf[TaskDataWrapper]) +.index("stage") +.first(key) +.last(key) +.asScala + + tasks.foreach { t => +kvstore.delete(t.getClass(), t.info.taskId) + } + + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) +.index("stageId") +.first(s.stageId) +.last(s.stageId) +.closeableIterator() + + val hasMoreAttempts = try { +remainingAttempts.asScala.exists { other => + other.info.attemptId != s.info.attemptId +} + } finally { +remainingAttempts.close() + } + + if (!hasMoreAttempts) { +kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } +} + } + + private def cleanupTasks(stage: LiveStage): Unit = { +val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage) +if (countToDelete > 0L) { + val stageKey = Array(stage.info.stageId, stage.info.attemptId) + val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) +.last(stageKey) + + // On live applications, try to delete finished tasks only; when in the SHS, treat all + // tasks as the same. + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => +!live || t.info.status != TaskState.RUNNING.toString() --- End diff -- in the SHS, wouldn't you still prefer to delete finished tasks over live ones? in all cases, should you really just try to delete finished tasks first, but still delete running tasks if need be? I don't see any filter like this in the old code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156732445 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -318,24 +319,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) (new InMemoryStore(), true) } -if (needReplay) { +val trackingStore = new ElementTrackingStore(kvstore, conf) +val listener = if (needReplay) { --- End diff -- `listener` is unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156706018 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -40,8 +41,10 @@ private[spark] abstract class LiveEntity { var lastWriteTime = 0L --- End diff -- minor: can the initial value be -1 instead? doesn't matter right now, but often in tests we use a manual clock starting at time 0. That would cause problems w/ this default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156730008 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala --- @@ -851,6 +842,97 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("eviction of old data") { +val testConf = conf.clone() + .set(MAX_RETAINED_JOBS, 2) + .set(MAX_RETAINED_STAGES, 2) + .set(MAX_RETAINED_TASKS_PER_STAGE, 2) + .set(MAX_RETAINED_DEAD_EXECUTORS, 1) +val listener = new AppStatusListener(store, testConf, true) + +// Start 3 jobs, all should be kept. Stop one, it should be evicted. +time += 1 +listener.onJobStart(SparkListenerJobStart(1, time, Nil, null)) +listener.onJobStart(SparkListenerJobStart(2, time, Nil, null)) +listener.onJobStart(SparkListenerJobStart(3, time, Nil, null)) +assert(store.count(classOf[JobDataWrapper]) === 3) + +time += 1 +listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded)) +assert(store.count(classOf[JobDataWrapper]) === 2) +intercept[NoSuchElementException] { + store.read(classOf[JobDataWrapper], 2) +} + +// Start 3 stages, all should be kept. Stop 2 of them, the oldest stopped one should be +// deleted. Start a new attempt of the second stopped one, and verify that the stage graph --- End diff -- oldest meaning smallest id? or ordered they are submitted? with a non-linear stage DAG, the ordering of ids, start-time, & end-time can be ordered arbitrarily. Ids will correspond to submission order, but then stage retries complicates that. I guess I'm just trying to make sure I understand how the kvstore works, and if there is some important part I'm missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156716551 --- Diff: core/src/main/scala/org/apache/spark/status/KVUtils.scala --- @@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging { db } + /** Turns a KVStoreView into a Scala sequence, applying a filter. */ + def viewToSeq[T]( --- End diff -- looks like you don't actually need a sequence at all in any of the call sites, you could just use an iterator. I'm thinking about the price of that say if you're cleaning up 100k tasks repeatedly. This does give you a nice spot to include `iter.close()`, but I think you could change this to `foreachWithMaxFilterClose` or something to avoid ever creating the list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156710644 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156709663 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. --- End diff -- they are actually run in a different thread, right? (comment on `addTrigger` looks correct) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156711461 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -772,4 +813,118 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { +// Because the limit is on the number of *dead* executors, we need to calculate whether +// there are actually enough dead executors to be deleted. +val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) +val dead = count - activeExecutorCount --- End diff -- KVStore has this: ```java /** * Returns the number of items of the given type which match the given indexed value. */ long count(Class type, String index, Object indexedValue) throws Exception; ``` so with an api change you could get the right number directly from the store. (though this conflicts with my other comment about not using `kvstore.count()` at all in the trigger, which I think is more important.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r153939785 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -727,8 +769,8 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long): Unit = { -entity.write(kvstore, now) + private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { --- End diff -- I prefer how the current version reads on the call site, e.g.: update(exec, now, last = true) Also, Spark generally avoids Java-beans-style prefixes in Scala code (like "is" or "get"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151607724 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -727,8 +769,8 @@ private[spark] class AppStatusListener( } } - private def update(entity: LiveEntity, now: Long): Unit = { -entity.write(kvstore, now) + private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = { --- End diff -- nit: maybe change `last` to `isLast` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151607855 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -40,8 +41,8 @@ private[spark] abstract class LiveEntity { var lastWriteTime = 0L - def write(store: KVStore, now: Long): Unit = { -store.write(doUpdate()) + def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = { +store.write(doUpdate(), checkTriggers || lastWriteTime == 0L) --- End diff -- can you specify why does it check triggers on the first write? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151603481 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala --- @@ -93,9 +93,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // To limit the total memory usage of JobProgressListener, we only track information for a fixed // number of non-active jobs and stages (there is no limit for active jobs and stages): - val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) - val retainedTasks = conf.get(UI_RETAINED_TASKS) + val retainedStages = conf.getInt("spark.ui.retainedStages", 1000) --- End diff -- This class is being removed in a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151324714 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store,
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151445965 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala --- @@ -93,9 +93,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // To limit the total memory usage of JobProgressListener, we only track information for a fixed // number of non-active jobs and stages (there is no limit for active jobs and stages): - val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) - val retainedTasks = conf.get(UI_RETAINED_TASKS) + val retainedStages = conf.getInt("spark.ui.retainedStages", 1000) --- End diff -- Why use hard code here? Maybe make the configurations in `config.scala` public, so that we don't need to write the default values in two places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151510043 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151321285 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store,
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/19751 [SPARK-20653][core] Add cleaning of old elements from the status store. This change restores the functionality that keeps a limited number of different types (jobs, stages, etc) depending on configuration, to avoid the store growing indefinitely over time. The feature is implemented by creating a new type (ElementTrackingStore) that wraps a KVStore and allows triggers to be set up for when elements of a certain type meet a certain threshold. Triggers don't need to necessarily only delete elements, but the current API is set up in a way that makes that use case easier. The new store also has a trigger for the "close" call, which makes it easier for listeners to register code for cleaning things up and flushing partial state to the store. The old configurations for cleaning up the stored elements from the core and SQL UIs are now active again, and the old unit tests are re-enabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-20653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19751.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19751 commit 8c346a148d7be78b0f53aadb9c8ca78098b0ea6c Author: Marcelo VanzinDate: 2017-04-18T20:38:10Z [SPARK-20653][core] Add cleaning of old elements from the status store. This change restores the functionality that keeps a limited number of different types (jobs, stages, etc) depending on configuration, to avoid the store growing indefinitely over time. The feature is implemented by creating a new type (ElementTrackingStore) that wraps a KVStore and allows triggers to be set up for when elements of a certain type meet a certain threshold. Triggers don't need to necessarily only delete elements, but the current API is set up in a way that makes that use case easier. The new store also has a trigger for the "close" call, which makes it easier for listeners to register code for cleaning things up and flushing partial state to the store. The old configurations for cleaning up the stored elements from the core and SQL UIs are now active again, and the old unit tests are re-enabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org