This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8e594f0 [SPARK-35763][SS] Remove the StateStoreCustomMetric subclass enumeration dependency 8e594f0 is described below commit 8e594f084a9f307cefcad13ccf439b0b608228c9 Author: Venki Korukanti <venki.koruka...@gmail.com> AuthorDate: Thu Jun 17 07:48:24 2021 +0900 [SPARK-35763][SS] Remove the StateStoreCustomMetric subclass enumeration dependency ### What changes were proposed in this pull request? Remove the usage of the enumerating subclasses of `StateStoreCustomMetric` dependency. To achieve it, add couple of utility methods to `StateStoreCustomMetric` * `withNewDesc(desc : String)` to `StateStoreCustomMetric` for cloning the instance with a new `desc` (currently used in `SymmetricHashJoinStateManager`) * `createSQLMetric(sparkContext: sparkContext): SQLMetric` for creating a corresponding `SQLMetric` to show the metric in UI and accumulate at the query level (currently used in `statefulOperator. stateStoreCustomMetrics`) ### Why are the changes needed? Code in [SymmetricHashJoinStateManager](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L321) and [StateStoreWriter](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L129) rely on the subclass implementations of [StateStoreCustomMetric](https://github.com/apache/spark/blob/master/sql/core/src/main/sca [...] If a new subclass of `StateStoreCustomMetric` is added, it requires code changes to `SymmetricHashJoinStateManager` and `StateStoreWriter`, and we may miss the update if there is no existing test coverage. To prevent these issues add a couple of utility methods to `StateStoreCustomMetric` as mentioned above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT and a new UT Closes #32914 from vkorukanti/SPARK-35763. Authored-by: Venki Korukanti <venki.koruka...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/StateStore.scala | 29 +++++++++++++++++++--- .../state/SymmetricHashJoinStateManager.scala | 10 +------- .../execution/streaming/statefulOperators.scala | 7 +----- .../streaming/state/StateStoreSuite.scala | 14 +++++++++++ 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 889477b..60ad318 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ThreadUtils, Utils} @@ -182,16 +183,36 @@ object StateStoreMetrics { /** * Name and description of custom implementation-specific metrics that a - * state store may wish to expose. + * state store may wish to expose. Also provides [[SQLMetric]] instance to + * show the metric in UI and accumulate it at the query level. */ trait StateStoreCustomMetric { def name: String def desc: String + def withNewDesc(desc: String): StateStoreCustomMetric + def createSQLMetric(sparkContext: SparkContext): SQLMetric } -case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric -case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric -case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric +case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric { + override def withNewDesc(newDesc: String): StateStoreCustomSumMetric = copy(desc = desc) + + override def createSQLMetric(sparkContext: SparkContext): SQLMetric = + SQLMetrics.createMetric(sparkContext, desc) +} + +case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric { + override def withNewDesc(desc: String): StateStoreCustomSizeMetric = copy(desc = desc) + + override def createSQLMetric(sparkContext: SparkContext): SQLMetric = + SQLMetrics.createSizeMetric(sparkContext, desc) +} + +case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric { + override def withNewDesc(desc: String): StateStoreCustomTimingMetric = copy(desc = desc) + + override def createSQLMetric(sparkContext: SparkContext): SQLMetric = + SQLMetrics.createTimingMetric(sparkContext, desc) +} /** * An exception thrown when an invalid UnsafeRow is detected in state store. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 5c74811..d342c83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -319,15 +319,7 @@ class SymmetricHashJoinStateManager( keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes, keyWithIndexToValueMetrics.customMetrics.map { - case (s @ StateStoreCustomSumMetric(_, desc), value) => - s.copy(desc = newDesc(desc)) -> value - case (s @ StateStoreCustomSizeMetric(_, desc), value) => - s.copy(desc = newDesc(desc)) -> value - case (s @ StateStoreCustomTimingMetric(_, desc), value) => - s.copy(desc = newDesc(desc)) -> value - case (s, _) => - throw new IllegalArgumentException( - s"Unknown state store custom metric is found at metrics: $s") + case (metric, value) => (metric.withNewDesc(desc = newDesc(metric.desc)), value) } ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index cef0c01..f9b639b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -126,12 +126,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) provider.supportedCustomMetrics.map { - case StateStoreCustomSumMetric(name, desc) => - name -> SQLMetrics.createMetric(sparkContext, desc) - case StateStoreCustomSizeMetric(name, desc) => - name -> SQLMetrics.createSizeMetric(sparkContext, desc) - case StateStoreCustomTimingMetric(name, desc) => - name -> SQLMetrics.createTimingMetric(sparkContext, desc) + metric => (metric.name, metric.createSQLMetric(sparkContext)) }.toMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index af5e9bb..4323725 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1013,6 +1013,20 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] assert(err.getMessage.contains("Cannot put a null value")) } + test("SPARK-35763: StateStoreCustomMetric withNewDesc and createSQLMetric") { + val metric = StateStoreCustomSizeMetric(name = "m1", desc = "desc1") + val metricNew = metric.withNewDesc("new desc") + assert(metricNew.desc === "new desc", "incorrect description in copied instance") + assert(metricNew.name === "m1", "incorrect name in copied instance") + + val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763").set(RPC_NUM_RETRIES, 1) + withSpark(new SparkContext(conf)) { sc => + val sqlMetric = metric.createSQLMetric(sc) + assert(sqlMetric != null) + assert(sqlMetric.name === Some("desc1")) + } + } + /** Return a new provider with a random id */ def newStoreProvider(): ProviderClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org