Repository: spark
Updated Branches:
  refs/heads/master 7aac755ba -> e9faae135


[SPARK-21409][SS] Follow up PR to allow different types of custom metrics to be 
exposed

## What changes were proposed in this pull request?

Implementation may expose both timing as well as size metrics. This PR enables 
that.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #18661 from tdas/SPARK-21409-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9faae13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9faae13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9faae13

Branch: refs/heads/master
Commit: e9faae135cf47c4592d4caa36f3f57e439d4fe88
Parents: 7aac755
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Jul 17 19:28:55 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Jul 17 19:28:55 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/state/StateStore.scala | 11 ++++++++++-
 .../sql/execution/streaming/statefulOperators.scala      |  9 ++++++---
 2 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e9faae13/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9da610e..182fc27 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -92,6 +92,10 @@ trait StateStore {
    */
   def abort(): Unit
 
+  /**
+   * Return an iterator containing all the key-value pairs in the SateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
   def iterator(): Iterator[UnsafeRowPair]
 
   /** Current metrics of the state store */
@@ -120,7 +124,12 @@ case class StateStoreMetrics(
  * Name and description of custom implementation-specific metrics that a
  * state store may wish to expose.
  */
-case class StateStoreCustomMetric(name: String, desc: String)
+trait StateStoreCustomMetric {
+  def name: String
+  def desc: String
+}
+case class StateStoreCustomSizeMetric(name: String, desc: String) extends 
StateStoreCustomMetric
+case class StateStoreCustomTimingMetric(name: String, desc: String) extends 
StateStoreCustomMetric
 
 /**
  * Trait representing a provider that provide [[StateStore]] instances 
representing

http://git-wip-us.apache.org/repos/asf/spark/blob/e9faae13/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 77b1160..3ca7f4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -104,7 +104,6 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
    * This should be called in that task after the store has been updated.
    */
   protected def setStoreMetrics(store: StateStore): Unit = {
-
     val storeMetrics = store.metrics
     longMetric("numTotalStateRows") += storeMetrics.numKeys
     longMetric("stateMemory") += storeMetrics.memoryUsedBytes
@@ -115,8 +114,12 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
 
   private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
     val provider = 
StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
-    provider.supportedCustomMetrics.map { m =>
-      m.name -> SQLMetrics.createTimingMetric(sparkContext, m.desc) }.toMap
+    provider.supportedCustomMetrics.map {
+      case StateStoreCustomSizeMetric(name, desc) =>
+        name -> SQLMetrics.createSizeMetric(sparkContext, desc)
+      case StateStoreCustomTimingMetric(name, desc) =>
+        name -> SQLMetrics.createTimingMetric(sparkContext, desc)
+    }.toMap
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to