Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156808475 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { + None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { + val existing = triggers.getOrElse(klass, Seq()) + triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { + flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { + executor match { + case Some(exec) => + exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } + }) + + case _ => + fn + } + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally checking for whether to fire triggers. */ + def write(value: Any, checkTriggers: Boolean): Unit = { + write(value) + + if (checkTriggers && !stopped) { + triggers.get(value.getClass()).foreach { list => + doAsync { + val count = store.count(value.getClass()) --- End diff -- `kvstore.count` is pretty cheap in-memory (it's basically a hash table lookup), and cheap enough on a disk store (its cost is dwarfed by the writes that actually trigger the call). So while I do agree that the interface is not optimal, of the 4 call sites, it handles 3 without need for any special code, and the 4th (`cleanupExecutors`) would still need to call `kvstore.count()` or keep that count separately, so this sounds simpler.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org