[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-05-25 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/9571


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111942037
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
-try {
-  val newLastScanTime = getNewLastScanTime()
-  logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq[FileStatus]())
-  // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
-.filter { entry =>
-  try {
-val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-!entry.isDirectory() &&
-  // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
-  // reading a garbage file is safe, but we would log an error 
which can be scary to
-  // the end-user.
-  !entry.getPath().getName().startsWith(".") &&
-  prevFileSize < entry.getLen()
-  } catch {
-case e: AccessControlException =>
-  // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
-  // every poll.
-  logDebug(s"No permission to read $entry, ignoring.")
-  false
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
+  try {
+val newLastScanTime = getNewLastScanTime()
+logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
+  .getOrElse(Seq[FileStatus]())
+// scan for modified applications, replay and merge them
+val logInfos: Seq[FileStatus] = statusList
+  .filter { entry =>
+try {
+  val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+  !entry.isDirectory() &&
+// FsHistoryProvider generates a hidden file which can't 
be read.  Accidentally
+// reading a garbage file is safe, but we would log an 
error which can be scary to
+// the end-user.
+!entry.getPath().getName().startsWith(".") &&
+prevFileSize < entry.getLen()
+} catch {
+  case e: AccessControlException =>
+// Do not use "logInfo" since these messages can get 
pretty noisy if printed on
+// every poll.
+logDebug(s"No permission to read $entry, ignoring.")
+false
+}
+  }
+  .flatMap { entry => Some(entry) }
+  .sortWith { case (entry1, entry2) =>
+entry1.getModificationTime() >= entry2.getModificationTime()
   }
-}
-.flatMap { entry => Some(entry) }
-.sortWith { case (entry1, entry2) =>
-  entry1.getModificationTime() >= entry2.getModificationTime()
-  }
 
   if (logInfos.nonEmpty) {
 logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
-  })
-}
-  } catch {
-// let the iteration over logInfos break, since an exception on
-// replayExecutor.submit (..) indicates the ExecutorService is 
unable
-// to take any more submissions at this time
-
-case e: Exception =>
-  logError(s"Exception while submitting event log for replay", e)
-  }
-
-  pendingReplayTasksCount.addAndGet(tasks.size)
+var tasks = mutable.ListBuffer[Future[_]]()
 
-  tasks.foreach { task =>
 try {
-  // Wait for all tasks to finish. This makes sure that 
checkForLogs
-  // is not scheduled again while some tasks are already running in
-  // the replayExecutor.
-  task.get()
+  for (file <- logInfos) {
+tasks += replayExecutor.submit(new Runnable {
+  override def run(): Unit =
+time(metrics.historyMergeTimer, 
S

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111934303
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -729,6 +778,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime),
+("update.timer", updateTimer),
+("history.merge.timer", historyMergeTimer)))
--- End diff --

cut


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

--

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111914872
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -99,6 +104,19 @@ private[history] abstract class 
ApplicationHistoryProvider {
   }
 
   /**
+   * Bind to the History Server: threads should be started here; 
exceptions may be raised
+   * Start the provider: threads should be started here; exceptions may be 
raised
+   * if the history provider cannot be started.
+   * The base implementation contains a re-entrancy check and should
--- End diff --

All the work on Yarn service model, and that of SmartFrog before it, have 
given me a fear of startup logic. I'll see about doing it there though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111913717
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -410,34 +409,25 @@ private[history] class CacheMetrics(prefix: String) 
extends Source {
 ("update.triggered.count", updateTriggeredCount))
 
   /** all metrics, including timers */
-  private val allMetrics = counters ++ Seq(
+  private val allMetrics: Seq[(String, Metric with Counting)] = counters 
++ Seq(
--- End diff --

Either I was just being explicit about what came in, or the IDE decided to 
get involved. Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111831406
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
-try {
-  val newLastScanTime = getNewLastScanTime()
-  logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq[FileStatus]())
-  // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
-.filter { entry =>
-  try {
-val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-!entry.isDirectory() &&
-  // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
-  // reading a garbage file is safe, but we would log an error 
which can be scary to
-  // the end-user.
-  !entry.getPath().getName().startsWith(".") &&
-  prevFileSize < entry.getLen()
-  } catch {
-case e: AccessControlException =>
-  // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
-  // every poll.
-  logDebug(s"No permission to read $entry, ignoring.")
-  false
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
+  try {
+val newLastScanTime = getNewLastScanTime()
+logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
+  .getOrElse(Seq[FileStatus]())
+// scan for modified applications, replay and merge them
+val logInfos: Seq[FileStatus] = statusList
+  .filter { entry =>
+try {
+  val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+  !entry.isDirectory() &&
+// FsHistoryProvider generates a hidden file which can't 
be read.  Accidentally
+// reading a garbage file is safe, but we would log an 
error which can be scary to
+// the end-user.
+!entry.getPath().getName().startsWith(".") &&
+prevFileSize < entry.getLen()
+} catch {
+  case e: AccessControlException =>
+// Do not use "logInfo" since these messages can get 
pretty noisy if printed on
+// every poll.
+logDebug(s"No permission to read $entry, ignoring.")
+false
+}
+  }
+  .flatMap { entry => Some(entry) }
+  .sortWith { case (entry1, entry2) =>
+entry1.getModificationTime() >= entry2.getModificationTime()
   }
-}
-.flatMap { entry => Some(entry) }
-.sortWith { case (entry1, entry2) =>
-  entry1.getModificationTime() >= entry2.getModificationTime()
-  }
 
   if (logInfos.nonEmpty) {
 logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
-  })
-}
-  } catch {
-// let the iteration over logInfos break, since an exception on
-// replayExecutor.submit (..) indicates the ExecutorService is 
unable
-// to take any more submissions at this time
-
-case e: Exception =>
-  logError(s"Exception while submitting event log for replay", e)
-  }
-
-  pendingReplayTasksCount.addAndGet(tasks.size)
+var tasks = mutable.ListBuffer[Future[_]]()
 
-  tasks.foreach { task =>
 try {
-  // Wait for all tasks to finish. This makes sure that 
checkForLogs
-  // is not scheduled again while some tasks are already running in
-  // the replayExecutor.
-  task.get()
+  for (file <- logInfos) {
+tasks += replayExecutor.submit(new Runnable {
+  override def run(): Unit =
+time(metrics.historyMergeTimer, 
Some(met

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111831786
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -729,6 +778,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime),
+("update.timer", updateTimer),
+("history.merge.timer", historyMergeTimer)))
--- End diff --

This times is still in this list, why? It's not useful. (This is the timer 
I was talking about in my previous comment.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact inf

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111830465
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala ---
@@ -410,34 +409,25 @@ private[history] class CacheMetrics(prefix: String) 
extends Source {
 ("update.triggered.count", updateTriggeredCount))
 
   /** all metrics, including timers */
-  private val allMetrics = counters ++ Seq(
+  private val allMetrics: Seq[(String, Metric with Counting)] = counters 
++ Seq(
--- End diff --

Is declaring the type useful here in some way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2017-04-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r111830700
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -99,6 +104,19 @@ private[history] abstract class 
ApplicationHistoryProvider {
   }
 
   /**
+   * Bind to the History Server: threads should be started here; 
exceptions may be raised
+   * Start the provider: threads should be started here; exceptions may be 
raised
+   * if the history provider cannot be started.
+   * The base implementation contains a re-entrancy check and should
--- End diff --

This makes this interface awkward. Why can't the `HistoryServer` keep track 
of that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-09-01 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r77193230
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

UI would be good, though it should be some separate work: it'd need to 
change the history provider API to add some progress check. 

FWIW the ATS history provider simply adds this as a string in the list of 
key-value pairs a provider can publish. Ugly and not cross-provider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-31 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r77030612
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

> I think the total load time does need to be measured

That one is fine. I'm talking about the one that measures each app 
separately (`history.merge.timer`); that one is not useful to the user since 
every app is different.

> a remotely accessible metric is a 0/1 boolean indicating whether or not 
replay has completed

That's good to expose, not just as a metric but also in the UI itself, if 
it's not currently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-31 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76972524
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

what's actually being displayed is the average time/event, so it is 
independent of history size, it more measures system performance during replay.

Even so, it may not be that useful, so I can cut it.

What may actually be more interesting as a remotely accessible metric is a 
0/1 boolean indicating whether or not replay has completed. There currently 
doesn't seem to be any way to ask the history server if all histories have 
loaded: the UI comes up while replay is in progress, meaning that after 
startup/restart, a query of a history may fail not because the app doesn' t 
have history, but because the replay hasn't completed yet. Exposing the state 
to management tools allows functional tests & other scripts to block until the 
history is fully loaded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apa

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-31 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76952864
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

No problem, though I think the total load time does need to be measured and 
logged somewhere. I'll make sure that happens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76851430
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

Actually, sorry, not this one but `historyMergeTimer` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76851220
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -664,6 +707,116 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to process an event in the history merge operation. */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  private val clock = new SystemClock
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new TimestampGauge(clock)
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new TimestampGauge(clock)
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  register(Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
--- End diff --

Similarly, this metric suffers from the same issue as `appUiLoadTimer` and 
probably shouldn't be exposed. Sorry should have pointed this out earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-30 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76773046
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +700,90 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Metrics integration: the various counters of activity
+   * Time a closure, returning its output.
+   * @param timer timer
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer)(fn: => T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  timeCtx.close()
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider) 
extends Source {
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Metrics registry. */
+  override val metricRegistry = new MetricRegistry()
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
--- End diff --

I've cut the metric


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-27 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76515026
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -225,14 +274,26 @@ class HistoryServer(
   }
 }
 
+
--- End diff --

got it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-27 Thread steveloughran
GitHub user steveloughran reopened a pull request:

https://github.com/apache/spark/pull/9571

[SPARK-11373] [CORE] Add metrics to the History Server and FsHistoryProvider

This adds metrics to the history server, with the `FsHistoryProvider` 
metering its load, performance and reliability.

see [SPARK-11373](https://issues.apache.org/jira/browse/SPARK-11373)

The `HistoryServer` sets up the codahale metrics for the Web under metrics/ 
with metrics/metrics behind metrics, metrics/health any health probes and 
metrics/threads a thread dump. There's currently no attempt to  hook up JMX, 
etc. The Web servlets are the ones tests can easily hit and don't need 
infrastructure, so are the good initial first step.

It then passes the metrics and health registries down to the providers in a 
`ApplicationHistoryBinding` case class, via a new method

def start(binding: ApplicationHistoryBinding): Unit

The base class has implementation so that all existing providers will still 
link properly; the base implementation currently checks and fails
 the use of a binding case class is also to ensure that if new binding 
information were added in future, existing implementations would still link.

The `FsHistoryProvider` implements the `start()` method, registering two 
counters and two timers.

1. Number of update attempts and number of failed updates —and the same 
for app UI loads.
2. Time for updates and app UI loads.

Points of note

* Why not use Spark's `MetricsSystem`? I did start off with that, but it 
needs a `SparkContext` to run off, which the server doesn't have. Ideally that 
would be way to go, as it would support all the spark conf -based metrics 
setup. Someone who understands the `MetricsSystem` would need to get involved 
here as would make for a more complex patch. In `FsHistoryProvider` the 
registry information is all kept in a `Source` subclass for ease of future 
migration to `MetricsSystem`.
* Why the extra `HealthRegistry`? It's a nice way of allowing providers to 
indicate (possibly transient) health problems for monitoring tools/clients to 
hit. For the FS provider it could maybe flag when there hadn't been any 
successful update for a specified time period. (that could also be indicated by 
having a counter of "seconds since last update" and let monitoring tools 
monitor the counter value and act on it). Access control problems to the 
directory is something else which may be considered a liveness problem: it 
won't get better without human intervention
* The `FsHistoryProvider.start()` method should really take the thread 
start code from from class constructor's `initialize()` method. This would 
ensure that incomplete classes don't get called by spawned threads, and makes 
it possible for test-time subclasses to skip thread startup. I've not attempted 
to do that in this patch.
* No tests for this yet. Hitting the three metrics servlets in the 
HistoryServer is the obvious route; the JSON payload of the metrics can be 
parsed and scanned for relevant counters too. 
* Part of the patch for `HistoryServerSuite` removes the call to 
`HistoryServer.initialize()` the `before` clause. That was a duplicate call, 
one which hit the re-entrancy tests on the provider & registry. As well as 
cutting it, `HistoryServer.initialize()` has been made idempotent. That should 
not be needed -but it will eliminate the problem arising again.

Once the SPARK-1537 YARN timeline server history provider is committed, 
then I'll add metrics support there too. The YARN timeline provider would:

1. Add timers of REST operations as well as playback load times, which can 
count network delays as well as JSON deserialization overhead. 
2. Add a health check for connectivity too: the timeline server would be 
unhealthy if connections to the timeline server were either blocking or 
failing. And again, if there were security/auth problems, they'd be considered 
non-recoverable.
3. Move thread launch under the `start()` method, with some test subclasses 
disabling thread launch.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark feature/SPARK-11373

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9571


commit 2ae734fd358c5d33e75f92837c2e414dc620454a
Author: Steve Loughran 
Date:   2015-11-09T16:32:33Z

[SPARK-11373] First pass at adding metrics to the history server, with the 
FsHistoryProvider counting
1. Number of update attempts and number of failed updates
2. Time for updates and app UI loads

The HistoryServer sets up the codahale metrics for the Web under metrics/ 
with me

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-27 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/9571


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76098002
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -225,14 +274,26 @@ class HistoryServer(
   }
 }
 
+
--- End diff --

nit: too many blank lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r76097567
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +700,90 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Metrics integration: the various counters of activity
+   * Time a closure, returning its output.
+   * @param timer timer
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer)(fn: => T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  timeCtx.close()
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider) 
extends Source {
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Metrics registry. */
+  override val metricRegistry = new MetricRegistry()
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
--- End diff --

Ping. I see you use this in `time()` to update the total time, but do you 
need to register this metric below?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74981677
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
--- End diff --

We can move the classes to a different module when it becomes necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74963037
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
+sb.append("").append(entry._1).append(" = 
").append(entry._2.getCount).append('\n')
+}
+sb.append("  Gauges\n")
+metricRegistry.getGauges.asScala.foreach { entry =>
+  sb.append("").append(entry._1).append(" = 
").append(entry._2.getValue).append('\n')
+}
+sb.toString()
+  }
+
+  /**
+   * Get a named counter.
+   * @param counterName name of the counter
+   * @return the counter, if found
+   */
+  def getCounter(counterName: String): Option[Counter] = {
+Option(metricRegistry.getCounters(new MetricFilter {
+  def matches(name: String, metric: Metric): Boolean = name == 
counterName
+}).get(counterName))
+  }
+
+  /**
+   * Get a gauge of an unknown numeric type.
+   * @param gaugeName name of the gauge
+   * @return gauge, if found
+   */
+  def getGauge(gaugeName: String): Option[Gauge[_]] = {
+Option(metricRegistry.getGauges(new MetricFilter {
+  def matches(name: String, metric: Metric): Boolean = name == 
gaugeName
+}).get(gaugeName))
+  }
+
+  /**
+   * Get a Long gauge.
+   * @param gaugeName name of the gauge
+   * @return gauge, if found
+   * @throws ClassCastException if the gauge is found but of the wrong type
+   */
+  def getLongGauge(gaugeName: String): Option[Gauge[Long]] = {
+Option(metricRegistry.getGauges(new MetricFilter {
--- End diff --

done, &  cleaned up all the filtering to have a single `MetricByName` 
filter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74962913
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
+sb.append("").append(entry._1).append(" = 
").append(entry._2.getCount).append('\n')
+}
+sb.append("  Gauges\n")
+metricRegistry.getGauges.asScala.foreach { entry =>
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74962863
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74960995
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
--- End diff --

done; also made the`Timestamp` gauge take a Spark `Clock` for easier 
use/testing, plus some other standalone tests. Separate source file -> isolated 
tests.

These gauges are actually generic to any metrics, and the `LambdaLongGauge` 
gauge quite a nice one to use in a lot of places; if accompanied with 
`LambdaIntGauge` they could be used as a more functional equivalent to the 
various gauge subclasses throughout the code. I could put them somewhere more 
generic if you think its useful (and even, in a different patch, move the 
existing gauges over to them)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74911975
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -114,28 +123,45 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
   def initialize() {
-attachPage(new HistoryPage(this))
+if (!initialized.getAndSet(true)) {
--- End diff --

...Pulled it from the test suite, so the check is superflous, and cut it...

Looking at this more, I think the metrics setup should be moved to 
`bind()`, rather than `initialize()`, precisely because the metrics setup may 
be doing things in terms of thread startup, and you don't want that to happen 
so early in class construction. Doing that and the testing/review to make sure 
that the later-metrics-init won't break assumptions elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74910950
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  private val countersAndGauges = Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime)
+  )
+
+  private val timers = Seq (
+("update.timer", updateTimer),
+("history.merge.timer", historyMergeTimer),
+("appui.load.timer", appUiLoadTimer))
+
+  val allMetrics = countersAndGauges ++ timers
--- End diff --

ok. merged them all in. I'd split them by type, but like you say: not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the f

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74910734
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
--- End diff --

replaced with " Average time to process an event in the history merge 
operation."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r74910796
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  private val countersAndGauges = Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime)
+  )
+
+  private val timers = Seq (
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For a

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73624479
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -204,6 +204,8 @@ private[spark] class MetricsSystem private (
   }
 }
   }
+
+  private[spark] def getMetricRegistry(): MetricRegistry = { registry }
--- End diff --

nit: braces are not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73624457
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
--- End diff --

Can you put this class (and the new gauges you're adding) in a new source 
file? They're not specific to `HistoryServer` so it's better to have them in a 
separate place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73624345
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  private val countersAndGauges = Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime)
+  )
+
+  private val timers = Seq (
--- End diff --

nit: no space before `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apac

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73624337
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val historyEventMergeTime = new LambdaLongGauge(() =>
+average(historyTotalMergeTime.getCount, historyEventCount.getCount))
+
+  /** Number of events replayed. */
+  val appUIEventCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
+
+  /** Total load time of all App UIs. */
+  val appUITotalLoadTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
+  val appUIEventReplayTime = new LambdaLongGauge(() =>
+average(appUITotalLoadTime.getCount, appUIEventCount.getCount))
+
+  private val countersAndGauges = Seq(
+("history.merge.event.count", historyEventCount),
+("history.merge.event.time", historyEventMergeTime),
+("history.merge.duration", historyTotalMergeTime),
+("update.count", updateCount),
+("update.failure.count", updateFailureCount),
+("update.last.attempted", updateLastAttempted),
+("update.last.succeeded", updateLastSucceeded),
+("appui.load.count", appUILoadCount),
+("appui.load.duration", appUITotalLoadTime),
+("appui.load.failure.count", appUILoadFailureCount),
+("appui.load.not-found.count", appUILoadNotFoundCount),
+("appui.event.count", appUIEventCount),
+("appui.event.replay.time", appUIEventReplayTime)
+  )
+
+  private val timers = Seq (
+("update.timer", updateTimer),
+("history.merge.timer", historyMergeTimer),
+("appui.load.timer", appUiLoadTimer))
+
+  val allMetrics = countersAndGauges ++ timers
--- End diff --

You don't seem to need these lists outside of the context of calling 
`register`, so can you instead inline the list into the call to avoid having 
these fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well.

[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73624122
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +700,90 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Metrics integration: the various counters of activity
+   * Time a closure, returning its output.
+   * @param timer timer
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer)(fn: => T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  timeCtx.close()
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider) 
extends Source {
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Metrics registry. */
+  override val metricRegistry = new MetricRegistry()
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
--- End diff --

I see you added the more useful `appUIEventReplayTime`, so is this metric 
needed? I don't think a lot of useful information, if any, can be derived from 
it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73623900
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
+sb.append("").append(entry._1).append(" = 
").append(entry._2.getCount).append('\n')
+}
+sb.append("  Gauges\n")
+metricRegistry.getGauges.asScala.foreach { entry =>
+  sb.append("").append(entry._1).append(" = 
").append(entry._2.getValue).append('\n')
+}
+sb.toString()
+  }
+
+  /**
+   * Get a named counter.
+   * @param counterName name of the counter
+   * @return the counter, if found
+   */
+  def getCounter(counterName: String): Option[Counter] = {
+Option(metricRegistry.getCounters(new MetricFilter {
+  def matches(name: String, metric: Metric): Boolean = name == 
counterName
+}).get(counterName))
+  }
+
+  /**
+   * Get a gauge of an unknown numeric type.
+   * @param gaugeName name of the gauge
+   * @return gauge, if found
+   */
+  def getGauge(gaugeName: String): Option[Gauge[_]] = {
+Option(metricRegistry.getGauges(new MetricFilter {
+  def matches(name: String, metric: Metric): Boolean = name == 
gaugeName
+}).get(gaugeName))
+  }
+
+  /**
+   * Get a Long gauge.
+   * @param gaugeName name of the gauge
+   * @return gauge, if found
+   * @throws ClassCastException if the gauge is found but of the wrong type
+   */
+  def getLongGauge(gaugeName: String): Option[Gauge[Long]] = {
+Option(metricRegistry.getGauges(new MetricFilter {
--- End diff --

Can you call `getGauge` and cast instead of copy & pasting the code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73623757
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
--- End diff --

Maybe use `case (name, counter) =>`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73623780
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -226,6 +259,135 @@ class HistoryServer(
 }
 
 /**
+ * An abstract implementation of the metrics [[Source]] trait with some 
common operations.
+ */
+private[history] abstract class HistoryMetricSource(val prefix: String) 
extends Source {
+  override val metricRegistry = new MetricRegistry()
+
+  /**
+   * Register a sequence of metrics
+   * @param metrics sequence of metrics to register
+   */
+  def register(metrics: Seq[(String, Metric)]): Unit = {
+metrics.foreach { case (name, metric) =>
+  metricRegistry.register(fullname(name), metric)
+}
+  }
+
+  /**
+   * Create the full name of a metric by prepending the prefix to the name
+   * @param name short name
+   * @return the full name to use in registration
+   */
+  def fullname(name: String): String = {
+MetricRegistry.name(prefix, name)
+  }
+
+  /**
+   * Dump the counters and gauges.
+   * @return a string for logging and diagnostics -not for parsing by 
machines.
+   */
+  override def toString: String = {
+val sb = new StringBuilder(s"Metrics for $sourceName:\n")
+sb.append("  Counters\n")
+metricRegistry.getCounters.asScala.foreach { entry =>
+sb.append("").append(entry._1).append(" = 
").append(entry._2.getCount).append('\n')
+}
+sb.append("  Gauges\n")
+metricRegistry.getGauges.asScala.foreach { entry =>
--- End diff --

Same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73623629
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -114,28 +123,45 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
   def initialize() {
-attachPage(new HistoryPage(this))
+if (!initialized.getAndSet(true)) {
--- End diff --

If that's the case, `initialize()` should be made private and its calls 
removed from the tests.

(I've seen this pattern of calling an `init()` method from constructors in 
Scala when the initialization code requires local variables, which otherwise 
would have to be fields in the class...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-08-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r73623442
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +710,123 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Time a closure, returning its output.
+   * The timer is updated with the duration, and if a counter is supplied, 
it's count
+   * is incremented by the duration.
+   * @param timer timer
+   * @param counter counter: an optional counter of the duration
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: 
=> T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  val duration = timeCtx.stop()
+  counter.foreach(_.inc(duration))
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, 
prefix: String)
+extends HistoryMetricSource(prefix) {
+
+  /**
+   * Function to return an average; if the count is 0, so is the average.
+   * @param value value to average
+   * @param count event count to divide by
+   * @return the average, or 0 if the counter is itself 0
+   */
+  private def average(value: Long, count: Long): Long = {
+if (count> 0) value / count else 0
+  }
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Number of events replayed as listing merge. */
+  val historyEventCount = new Counter()
+
+  /** Timer of listing merges. */
+  val historyMergeTimer = new Timer()
+
+  /** Total time to merge all histories. */
+  val historyTotalMergeTime = new Counter()
+
+  /** Average time to load a single event in the App UI */
--- End diff --

Comment looks wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67326183
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -395,7 +429,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
 val newAttempts = try {
 val bus = new ReplayListenerBus()
-val res = replay(fileStatus, bus)
+val (res, count) = replay(fileStatus, bus)
+metrics.eventReplayedCount.inc(count)
--- End diff --

I will measure them both then, and add some time-to-load metric


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67325987
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -110,3 +127,87 @@ private[history] abstract class 
ApplicationHistoryProvider {
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
 }
+
+/**
+ * A simple counter of events.
+ * There is no concurrency support here: all events must come in 
sequentially.
+ */
+private[history] class EventCountListener extends SparkListener {
--- End diff --

I was sure there was something to do this, I just couldn't find it when I 
looked. will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67242259
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -278,55 +304,63 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
-try {
-  val newLastScanTime = getNewLastScanTime()
-  logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq[FileStatus]())
-  // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
-.filter { entry =>
-  try {
-val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-!entry.isDirectory() && prevFileSize < entry.getLen()
-  } catch {
-case e: AccessControlException =>
-  // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
-  // every poll.
-  logDebug(s"No permission to read $entry, ignoring.")
-  false
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
+  try {
+val newLastScanTime = getNewLastScanTime()
+logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
+  .getOrElse(Seq[FileStatus]())
+// scan for modified applications, replay and merge them
+val logInfos: Seq[FileStatus] = statusList
+  .filter { entry =>
+try {
+  val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+  !entry.isDirectory() && prevFileSize < entry.getLen()
+} catch {
+  case e: AccessControlException =>
+// Do not use "logInfo" since these messages can get 
pretty noisy if printed on
+// every poll.
+logDebug(s"No permission to read $entry, ignoring.")
+false
+}
   }
+  .flatMap { entry => Some(entry) }
+  .sortWith { case (entry1, entry2) =>
+entry1.getModificationTime() >= entry2.getModificationTime()
 }
-.flatMap { entry => Some(entry) }
-.sortWith { case (entry1, entry2) =>
-  entry1.getModificationTime() >= entry2.getModificationTime()
-  }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
-  }
-  logInfos.map { file =>
-  replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
-  })
+if (logInfos.nonEmpty) {
+  logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
 }
-.foreach { task =>
-  try {
-// Wait for all tasks to finish. This makes sure that 
checkForLogs
-// is not scheduled again while some tasks are already running 
in
-// the replayExecutor.
-task.get()
-  } catch {
-case e: InterruptedException =>
-  throw e
-case e: Exception =>
-  logError("Exception while merging application listings", e)
+logInfos.map { file =>
+replayExecutor.submit(new Runnable {
+  override def run(): Unit = 
time(metrics.mergeApplicationListingTimer) {
--- End diff --

I commented later on the event count metric update; but this would be more 
useful if you knew the event count related to it, and if it were more targeted 
(i.e. just the replay, not including the time taken to merge the internal 
lists).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67242040
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -395,7 +429,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
 val newAttempts = try {
 val bus = new ReplayListenerBus()
-val res = replay(fileStatus, bus)
+val (res, count) = replay(fileStatus, bus)
+metrics.eventReplayedCount.inc(count)
--- End diff --

You're using this metric in two different contexts: while creating the 
listing, and while building the UI.

That means there's no good way to get a useful metric out of it, since you 
don't know how many events correspond to each and you're measuring the times 
for each separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67240845
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -278,55 +304,63 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
-try {
-  val newLastScanTime = getNewLastScanTime()
-  logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq[FileStatus]())
-  // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
-.filter { entry =>
-  try {
-val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-!entry.isDirectory() && prevFileSize < entry.getLen()
-  } catch {
-case e: AccessControlException =>
-  // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
-  // every poll.
-  logDebug(s"No permission to read $entry, ignoring.")
-  false
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
+  try {
+val newLastScanTime = getNewLastScanTime()
+logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
+  .getOrElse(Seq[FileStatus]())
+// scan for modified applications, replay and merge them
+val logInfos: Seq[FileStatus] = statusList
+  .filter { entry =>
+try {
+  val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+  !entry.isDirectory() && prevFileSize < entry.getLen()
+} catch {
+  case e: AccessControlException =>
+// Do not use "logInfo" since these messages can get 
pretty noisy if printed on
+// every poll.
+logDebug(s"No permission to read $entry, ignoring.")
+false
+}
   }
+  .flatMap { entry => Some(entry) }
+  .sortWith { case (entry1, entry2) =>
+entry1.getModificationTime() >= entry2.getModificationTime()
 }
-.flatMap { entry => Some(entry) }
-.sortWith { case (entry1, entry2) =>
-  entry1.getModificationTime() >= entry2.getModificationTime()
-  }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
-  }
-  logInfos.map { file =>
-  replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
-  })
+if (logInfos.nonEmpty) {
+  logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
 }
-.foreach { task =>
-  try {
-// Wait for all tasks to finish. This makes sure that 
checkForLogs
-// is not scheduled again while some tasks are already running 
in
-// the replayExecutor.
-task.get()
-  } catch {
-case e: InterruptedException =>
-  throw e
-case e: Exception =>
-  logError("Exception while merging application listings", e)
+logInfos.map { file =>
+replayExecutor.submit(new Runnable {
+  override def run(): Unit = 
time(metrics.mergeApplicationListingTimer) {
--- End diff --

This metric contains the time to replay an app event log, which can vary a 
lot per application; same issue with the UI load time issue I pointed out 
before. That makes this metric not very useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67240208
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -110,3 +127,87 @@ private[history] abstract class 
ApplicationHistoryProvider {
   def writeEventLogs(appId: String, attemptId: Option[String], zipStream: 
ZipOutputStream): Unit
 
 }
+
+/**
+ * A simple counter of events.
+ * There is no concurrency support here: all events must come in 
sequentially.
+ */
+private[history] class EventCountListener extends SparkListener {
--- End diff --

Isn't it easier to extend `SparkFirehoseListener` if all you're doing is 
counting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r67239990
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 ---
@@ -17,9 +17,12 @@
 
 package org.apache.spark.deploy.history
 
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.zip.ZipOutputStream
 
 import org.apache.spark.SparkException
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.scheduler.{SparkListener, 
SparkListenerApplicationEnd, SparkListenerApplicationStart, 
SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, 
SparkListenerBlockUpdated, SparkListenerEnvironmentUpdate, SparkListenerEvent, 
SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, 
SparkListenerTaskGettingResult, SparkListenerTaskStart, 
SparkListenerUnpersistRDD}
--- End diff --

At this point I'd just import `org.apache.spark.scheduler._`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-10 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r66614402
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +700,90 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Metrics integration: the various counters of activity
+   * Time a closure, returning its output.
+   * @param timer timer
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer)(fn: => T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  timeCtx.close()
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider) 
extends Source {
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Metrics registry. */
+  override val metricRegistry = new MetricRegistry()
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
--- End diff --

good point. That'd be a nice throughput. all that's needed is to count the 
#of events loaded and divide them for a playback rate, some events/second or 
nanos/event. I'd prefer the latter for any microbenchmarking; the events/second 
metric probably better for some monitoring display.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-10 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r66613829
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -278,6 +303,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
 try {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-10 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r66612954
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -114,28 +123,45 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
   def initialize() {
-attachPage(new HistoryPage(this))
+if (!initialized.getAndSet(true)) {
--- End diff --

It's in `HistoryServerSuite`

```scala
server = new HistoryServer(conf, provider, securityManager, 18080)
server.initialize()
server.bind()
```

issue here is the HS constructor already calls `initialize()`; some of the 
tests call it twice.

One option here would be move that `initialize()` out of the constructor of 
HS; that'd mean changing `main()`, but produce something better for subclassing 
and testing anyway. Even so, making `initialize()`  non-reentrant can only be a 
good thing, given that threads are spawned off, metrics registered, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r66204561
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -114,28 +123,45 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
   def initialize() {
-attachPage(new HistoryPage(this))
+if (!initialized.getAndSet(true)) {
--- End diff --

I don't remember now, but it did happen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r65786593
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -667,6 +700,90 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 prevFileSize < latest.fileSize
 }
   }
+
+  /**
+   * Metrics integration: the various counters of activity
+   * Time a closure, returning its output.
+   * @param timer timer
+   * @param fn function
+   * @tparam T type of return value of time
+   * @return the result of the function.
+   */
+  private def time[T](timer: Timer)(fn: => T): T = {
+val timeCtx = timer.time()
+try {
+  fn
+} finally {
+  timeCtx.close()
+}
+  }
+}
+
+/**
+ * Metrics integration: the various counters of activity.
+ */
+private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider) 
extends Source {
+
+  override val sourceName = "history.fs"
+
+  private val name = MetricRegistry.name(sourceName)
+
+  /** Metrics registry. */
+  override val metricRegistry = new MetricRegistry()
+
+  /** Number of updates. */
+  val updateCount = new Counter()
+
+  /** Number of update failures. */
+  val updateFailureCount = new Counter()
+
+  /** Update duration timer. */
+  val updateTimer = new Timer()
+
+  /** Time the last update was attempted. */
+  val updateLastAttempted = new Timestamp()
+
+  /** Time the last update succeded. */
+  val updateLastSucceeded = new Timestamp()
+
+  /** Number of App UI load operations. */
+  val appUILoadCount = new Counter()
+
+  /** Number of App UI load operations that failed due to a 
load/parse/replay problem. */
+  val appUILoadFailureCount = new Counter()
+
+  /** Number of App UI load operations that failed due to an unknown file. 
*/
+  val appUILoadNotFoundCount = new Counter()
+
+  /** Statistics on time to load app UIs. */
+  val appUiLoadTimer = new Timer()
--- End diff --

Given that different apps can have very different event logs, I wonder how 
useful this metric really is? Perhaps some extra processing needs to be done 
(e.g. events processed per sec or something)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r65786474
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -278,6 +303,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
* applications that haven't been updated since last time the logs were 
checked.
*/
   private[history] def checkForLogs(): Unit = {
+metrics.updateCount.inc()
+metrics.updateLastAttempted.touch()
+time(metrics.updateTimer) {
 try {
--- End diff --

Don't you need to indent this block of code now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #9571: [SPARK-11373] [CORE] Add metrics to the History Se...

2016-06-03 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/9571#discussion_r65786382
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---
@@ -114,28 +123,45 @@ class HistoryServer(
* this UI with the event logs in the provided base directory.
*/
   def initialize() {
-attachPage(new HistoryPage(this))
+if (!initialized.getAndSet(true)) {
--- End diff --

In which cases is this method called twice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org