Repository: spark Updated Branches: refs/heads/master 7aac755ba -> e9faae135
[SPARK-21409][SS] Follow up PR to allow different types of custom metrics to be exposed ## What changes were proposed in this pull request? Implementation may expose both timing as well as size metrics. This PR enables that. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #18661 from tdas/SPARK-21409-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9faae13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9faae13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9faae13 Branch: refs/heads/master Commit: e9faae135cf47c4592d4caa36f3f57e439d4fe88 Parents: 7aac755 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon Jul 17 19:28:55 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Jul 17 19:28:55 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/state/StateStore.scala | 11 ++++++++++- .../sql/execution/streaming/statefulOperators.scala | 9 ++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e9faae13/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9da610e..182fc27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -92,6 +92,10 @@ trait StateStore { */ def abort(): Unit + /** + * Return an iterator containing all the key-value pairs in the SateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ def iterator(): Iterator[UnsafeRowPair] /** Current metrics of the state store */ @@ -120,7 +124,12 @@ case class StateStoreMetrics( * Name and description of custom implementation-specific metrics that a * state store may wish to expose. */ -case class StateStoreCustomMetric(name: String, desc: String) +trait StateStoreCustomMetric { + def name: String + def desc: String +} +case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric +case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric /** * Trait representing a provider that provide [[StateStore]] instances representing http://git-wip-us.apache.org/repos/asf/spark/blob/e9faae13/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 77b1160..3ca7f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -104,7 +104,6 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => * This should be called in that task after the store has been updated. */ protected def setStoreMetrics(store: StateStore): Unit = { - val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes @@ -115,8 +114,12 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) - provider.supportedCustomMetrics.map { m => - m.name -> SQLMetrics.createTimingMetric(sparkContext, m.desc) }.toMap + provider.supportedCustomMetrics.map { + case StateStoreCustomSizeMetric(name, desc) => + name -> SQLMetrics.createSizeMetric(sparkContext, desc) + case StateStoreCustomTimingMetric(name, desc) => + name -> SQLMetrics.createTimingMetric(sparkContext, desc) + }.toMap } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org