Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19751#discussion_r151321285
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{HashMap, ListBuffer}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.{ThreadUtils, Utils}
    +import org.apache.spark.util.kvstore._
    +
    +/**
    + * A KVStore wrapper that allows tracking the number of elements of 
specific types, and triggering
    + * actions once they reach a threshold. This allows writers, for example, 
to control how much data
    + * is stored by potentially deleting old data as new data is added.
    + *
    + * This store is used when populating data either from a live UI or an 
event log. On top of firing
    + * triggers when elements reach a certain threshold, it provides two extra 
bits of functionality:
    + *
    + * - a generic worker thread that can be used to run expensive tasks 
asynchronously; the tasks can
    + *   be configured to run on the calling thread when more determinism is 
desired (e.g. unit tests).
    + * - a generic flush mechanism so that listeners can be notified about 
when they should flush
    + *   internal state to the store (e.g. after the SHS finishes parsing an 
event log).
    + *
    + * The configured triggers are run on the same thread that triggered the 
write, after the write
    + * has completed.
    + */
    +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) 
extends KVStore {
    +
    +  import config._
    +
    +  private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
    +  private val flushTriggers = new ListBuffer[() => Unit]()
    +  private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
    +    
Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker"))
    +  } else {
    +    None
    +  }
    +
    +  @volatile private var stopped = false
    +
    +  /**
    +   * Register a trigger that will be fired once the number of elements of 
a given type reaches
    +   * the given threshold.
    +   *
    +   * Triggers are fired in a separate thread, so that they can do more 
expensive operations
    +   * than would be allowed on the main threads populating the store.
    +   *
    +   * @param klass The type to monitor.
    +   * @param threshold The number of elements that should trigger the 
action.
    +   * @param action Action to run when the threshold is reached; takes as a 
parameter the number
    +   *               of elements of the registered type currently known to 
be in the store.
    +   */
    +  def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): 
Unit = {
    +    val existing = triggers.getOrElse(klass, Seq())
    +    triggers(klass) = existing :+ Trigger(threshold, action)
    +  }
    +
    +  /**
    +   * Adds a trigger to be executed before the store is flushed. This 
normally happens before
    +   * closing, and is useful for flushing intermediate state to the store, 
e.g. when replaying
    +   * in-progress applications through the SHS.
    +   *
    +   * Flush triggers are called synchronously in the same thread that is 
closing the store.
    +   */
    +  def onFlush(action: => Unit): Unit = {
    +    flushTriggers += { () => action }
    +  }
    +
    +  /**
    +   * Enqueues an action to be executed asynchronously.
    +   */
    +  def doAsync(fn: => Unit): Unit = {
    +    executor match {
    +      case Some(exec) =>
    +        exec.submit(new Runnable() {
    +          override def run(): Unit = Utils.tryLog { fn }
    +        })
    +
    +      case _ =>
    +        fn
    +    }
    +  }
    +
    +  override def read[T](klass: Class[T], naturalKey: Any): T = 
store.read(klass, naturalKey)
    +
    +  override def write(value: Any): Unit = store.write(value)
    +
    +  /** Write an element to the store, optionally checking for whether to 
fire triggers. */
    +  def write(value: Any, checkTriggers: Boolean): Unit = {
    +    write(value)
    +
    +    if (checkTriggers && !stopped) {
    +      triggers.get(value.getClass()).foreach { list =>
    --- End diff --
    
    we should remove the empty parens after `getClass`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to