chenwang-databricks commented on code in PR #55487:
URL: https://github.com/apache/spark/pull/55487#discussion_r3162953087
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##########
@@ -57,6 +57,21 @@ private[v2] trait V2ViewPreparation extends
LeafV2CommandExec {
protected lazy val fullNameParts: Seq[String] =
(catalog.name() +: identifier.asMultipartIdentifier).toSeq
+ /**
+ * Optional structured dependency list to stamp on the built `ViewInfo`.
Plain views leave
Review Comment:
Please just keep "Optional structured dependency list to stamp on the built
`ViewInfo`." - Don't have to mention the difference between plain views and
metric views.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##########
@@ -57,6 +57,21 @@ private[v2] trait V2ViewPreparation extends
LeafV2CommandExec {
protected lazy val fullNameParts: Seq[String] =
(catalog.name() +: identifier.asMultipartIdentifier).toSeq
+ /**
+ * Optional structured dependency list to stamp on the built `ViewInfo`.
Plain views leave
+ * this `None`; metric views populate it with the source-table lineage
produced by
+ * `MetricViewHelper.collectTableDependencies`.
+ */
+ protected def viewDependencies: Option[DependencyList] = None
+
+ /**
+ * Optional view sub-kind to stamp on `PROP_TABLE_TYPE`. Plain views leave
this `None` so
Review Comment:
Same, keep the description short. Don't have to mention plain views or
metric views.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala:
##########
@@ -1233,7 +1236,11 @@ case class ShowCreateTableCommand(
val builder = new StringBuilder
- val stmt = if (tableMetadata.tableType == VIEW) {
+ // SHOW CREATE TABLE on a metric view falls through to the VIEW branch,
which emits
+ // `CREATE VIEW ...` without the `WITH METRICS` qualifier. The output is
not yet
+ // round-trippable for metric views; tracked as follow-up.
+ val stmt = if (tableMetadata.tableType == VIEW ||
+ tableMetadata.tableType == METRIC_VIEW) {
Review Comment:
For SHOW CREATE TABLE - let's make it explicitly not supported for Metric
Views for now
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier,
SchemaUnsupported, ViewAlreadyExistsException, ViewSchemaMode}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{DependencyList, Identifier,
TableCatalog, TableSummary, ViewCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * Physical plan node for `CREATE VIEW ... WITH METRICS` on a v2
[[ViewCatalog]]. Mirrors
+ * [[CreateV2ViewExec]]'s flag handling and cross-type collision decoding via
the shared
+ * [[V2ViewPreparation]] trait, and adds metric-view-specific bits (typed
+ * [[DependencyList]] view-dependencies and `PROP_TABLE_TYPE = METRIC_VIEW`)
through the
+ * trait's optional hooks.
+ *
+ * Routed by [[DataSourceV2Strategy]] from
+ * [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] when the
resolved
+ * catalog is a non-session v2 catalog.
+ */
+case class CreateV2MetricViewExec(
+ catalog: ViewCatalog,
+ identifier: Identifier,
+ userSpecifiedColumns: Seq[(String, Option[String])],
+ comment: Option[String],
+ userProperties: Map[String, String],
+ originalText: String,
+ query: LogicalPlan,
+ allowExisting: Boolean,
+ replace: Boolean,
+ deps: Option[DependencyList]) extends V2ViewPreparation {
+
+ // Metric views don't carry a default-collation override.
+ override def collation: Option[String] = None
+
+ // CREATE stamps the current user, matching the v1 metric-view path (which
goes through
+ // ViewHelper.prepareTable -> CatalogTable.owner default) and
CreateV2ViewExec.
+ override def owner: Option[String] = Some(CurrentUserContext.getCurrentUser)
+
+ // Metric views always have schema-mode UNSUPPORTED (mirroring the v1 path
which passes
+ // SchemaUnsupported into ViewHelper.prepareTable).
+ override def viewSchemaMode: ViewSchemaMode = SchemaUnsupported
+
+ override protected def viewDependencies: Option[DependencyList] = deps
+
+ override protected def tableType: Option[String] =
+ Some(TableSummary.METRIC_VIEW_TABLE_TYPE)
+
+ override protected def run(): Seq[InternalRow] = {
Review Comment:
If this is just mirroring CreateV2ViewExec, why would you need to define it
again?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException,
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog,
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants,
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory,
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
Review Comment:
We are still lacking the Read Back unit tests. Please create the metric view
and use SELECT to query the result back.
Also, I think we should create test cases for show tables and show views as
well.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException,
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog,
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants,
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory,
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+ import testImplicits._
+
+ private val testCatalogName = "testcat"
+ private val testNamespace = "ns"
+ private val sourceTableName = "events"
+ private val fullSourceTableName =
+ s"$testCatalogName.$testNamespace.$sourceTableName"
+ private val metricViewName = "mv"
+ private val fullMetricViewName =
+ s"$testCatalogName.$testNamespace.$metricViewName"
+
+ private val metricViewColumns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+ private val testTableData = Seq(
+ ("region_1", 1, 5.0),
+ ("region_2", 2, 10.0))
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(
+ s"spark.sql.catalog.$testCatalogName",
+ classOf[MetricViewRecordingCatalog].getName)
+ // A catalog that does not implement ViewCatalog - used for the negative
gate test.
+ spark.conf.set(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+ classOf[InMemoryTableCatalog].getName)
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+ spark.conf.unset(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+ super.afterAll()
+ }
+
+ private def withTestCatalogTables(body: => Unit): Unit = {
+ MetricViewRecordingCatalog.reset()
+ testTableData.toDF("region", "count", "price")
+ .createOrReplaceTempView("metric_view_v2_source")
+ try {
+ sql(
+ s"""CREATE TABLE $fullSourceTableName
+ |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+ body
+ } finally {
+ sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+ sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+ spark.catalog.dropTempView("metric_view_v2_source")
+ MetricViewRecordingCatalog.reset()
+ }
+ }
+
+ private def createMetricView(
+ name: String,
+ metricView: MetricView,
+ comment: Option[String] = None): String = {
+ val yaml = MetricViewFactory.toYAML(metricView)
+ val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+ sql(
+ s"""CREATE VIEW $name
+ |WITH METRICS$commentClause
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ yaml
+ }
+
+ private def capturedViewInfo(): ViewInfo = {
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+ val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+ assert(info != null,
+ s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+ info
+ }
+
+ test("V2 catalog receives METRIC_VIEW table type and view text via
ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s
constructor stamps it
+ // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the
round-tripped row
+ // back to `CatalogTableType.METRIC_VIEW`.
+ assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(info.queryText() === yaml)
+
+ val deps = info.viewDependencies()
+ assert(deps != null)
+ assert(deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("V2 catalog path populates metric_view.* + view context + sql configs
on ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+
+ // metric_view.* descriptive properties (mirrors DBR
SingleSourceMetricView).
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+ assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+ assert(props.get(MetricView.PROP_FROM_SQL) === null)
+ assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+ // SQL configs and current catalog/namespace are first-class typed
fields on ViewInfo, no
+ // longer encoded into properties for V2 catalogs.
+ assert(info.sqlConfigs().size > 0,
+ s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+ assert(info.currentCatalog() ===
+ spark.sessionState.catalogManager.currentCatalog.name())
+ assert(info.currentNamespace().toSeq ===
+ spark.sessionState.catalogManager.currentNamespace.toSeq)
+ }
+ }
+
+ test("DROP VIEW succeeds on a V2 metric view") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+ assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+ sql(s"DROP VIEW $fullMetricViewName")
+ assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+ }
+ }
+
+ test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+ withTestCatalogTables {
+ sql(s"DROP VIEW IF EXISTS
$testCatalogName.$testNamespace.does_not_exist")
+ }
+ }
+
+ test("V2 catalog path captures SQL source and comment") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(s"SELECT * FROM $fullSourceTableName"),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+ assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+ assert(props.get(MetricView.PROP_FROM_NAME) === null)
+ assert(props.get(MetricView.PROP_FROM_SQL) ===
+ s"SELECT * FROM $fullSourceTableName")
+ assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+ val deps = info.viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("metric view columns carry metric_view.type / metric_view.expr in
column metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val cols = capturedViewInfo().columns()
+ assert(cols.length === metricViewColumns.length)
+
+ val byName = cols.map(c => c.name() -> c).toMap
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regionMeta = metadataOf("region")
+ assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+
+ val countMeta = metadataOf("count_sum")
+ assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"measure")
+ assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ }
+ }
+
+ test("user-specified column names with comments preserve metric_view.*
metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(metricView)
+ // Give both columns new names, and a comment on each. Without the
`retainMetadata`
+ // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+ sql(
+ s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n
COMMENT 'count')
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+
+ val cols = capturedViewInfo().columns()
+ val byName = cols.map(c => c.name() -> c).toMap
+ assert(byName.keySet === Set("reg", "n"))
+
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regMeta = metadataOf("reg")
+ assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+ // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into
`Column.comment()`
+ // rather than leaving it inside `metadataInJSON`; assert via the V2
column accessor.
+ assert(byName("reg").comment() === "region alias")
+
+ val nMeta = metadataOf("n")
+ assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+ assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ assert(byName("n").comment() === "count")
+ }
+ }
+
+ test("dependency extraction: SQL source JOIN captures both tables") {
+ withTestCatalogTables {
+ val secondSource = s"$testCatalogName.$testNamespace.customers"
+ sql(
+ s"""CREATE TABLE $secondSource (id INT, name STRING)
+ |USING foo""".stripMargin)
+ try {
+ val joinSql =
+ s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+ s"JOIN $secondSource c ON t.count = c.id"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(joinSql),
+ where = None,
+ select = Seq(
+ Column("name", DimensionExpression("name"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null)
+ val depParts =
+
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+ assert(depParts === Set(
+ Seq(testCatalogName, testNamespace, sourceTableName),
+ Seq(testCatalogName, testNamespace, "customers")),
+ s"Expected dependencies on both source tables, got $depParts")
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $secondSource")
+ }
+ }
+ }
+
+ test("dependency extraction: SQL source subquery deduplicates same-table
references") {
+ withTestCatalogTables {
+ val subquerySql =
+ s"SELECT * FROM $fullSourceTableName " +
+ s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(subquerySql),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("dependency extraction: SQL source self-join deduplicates same-table
references") {
+ withTestCatalogTables {
+ val selfJoinSql =
+ s"SELECT a.region AS a_region, a.count AS a_count " +
+ s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+ s"ON a.region = b.region"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(selfJoinSql),
+ where = None,
+ select = Seq(
+ Column("region", DimensionExpression("a_region"), 0),
+ Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency for self-join, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric
view") {
+ withTestCatalogTables {
+ val first = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, first)
+ val firstYaml = capturedViewInfo().queryText()
+
+ // Replace with a new body (different WHERE clause).
+ val replacement = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 100"),
+ select = metricViewColumns)
+ val replacementYaml = MetricViewFactory.toYAML(replacement)
+ sql(
+ s"""CREATE OR REPLACE VIEW $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$replacementYaml
+ |$$$$""".stripMargin)
+
+ val finalInfo = capturedViewInfo()
+ assert(finalInfo.queryText() === replacementYaml,
+ "OR REPLACE should swap the captured ViewInfo's queryText.")
+ assert(finalInfo.queryText() !== firstYaml,
+ "OR REPLACE should not leave the original captured queryText in
place.")
+ }
+ }
+
+ test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view
exists") {
+ withTestCatalogTables {
+ val original = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, original)
+ val originalYaml = capturedViewInfo().queryText()
+
+ // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog
should not see
+ // the second create at all (V2ViewPreparation's `viewExists`
short-circuit fires before
+ // `buildViewInfo`), so the captured ViewInfo retains the original body.
+ val replacement = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 999"),
+ select = metricViewColumns)
+ val replacementYaml = MetricViewFactory.toYAML(replacement)
+ sql(
+ s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$replacementYaml
+ |$$$$""".stripMargin)
+
+ assert(capturedViewInfo().queryText() === originalYaml,
+ "IF NOT EXISTS over an existing metric view should be a no-op.")
+ }
+ }
+
+ test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " +
+ "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") {
+ withTestCatalogTables {
+ // Pre-create a regular v2 table at the same ident the metric view will
target. The
+ // catalog's `createView` call below should raise
`ViewAlreadyExistsException`, which
+ // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the
precise cross-type
+ // collision error that `CreateV2ViewExec` emits.
+ sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+
+ val mv = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(mv)
+ val ex = intercept[AnalysisException] {
+ sql(
+ s"""CREATE VIEW $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ }
+ // CreateV2ViewExec / CreateV2MetricViewExec route this through
+ // `unsupportedCreateOrReplaceViewOnTableError` which maps to
+ // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE`.
+ assert(ex.getCondition === "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE",
+ s"Expected EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE, got
${ex.getCondition}: ${ex.getMessage}")
+ }
+ }
+
+ test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table
sits at the " +
+ "ident") {
+ withTestCatalogTables {
+ sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+ val mv = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(mv)
+ // IF NOT EXISTS over a table is a no-op (v1 parity), not an error.
+ sql(
+ s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+ assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident),
+ "IF NOT EXISTS over a v2 table should not register a view in the
catalog.")
+ }
+ }
+
+ test("dependency extraction: V1 session-catalog source emits 3-part
nameParts") {
Review Comment:
Please re-arrange the order of all the test cases:
1. Create related tests
2. Dependency extraction
3. Select cases
4. Described cases
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala:
##########
@@ -39,15 +43,22 @@ case class CreateMetricViewCommand(
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def run(sparkSession: SparkSession): Seq[Row] = {
- val catalog = sparkSession.sessionState.catalog
- val name = child match {
- case v: ResolvedIdentifier =>
- v.identifier.asTableIdentifier
+ child match {
+ case v: ResolvedIdentifier if CatalogV2Util.isSessionCatalog(v.catalog)
=>
+ createMetricViewInSessionCatalog(sparkSession, v)
+ case _: ResolvedIdentifier =>
+ // Non-session v2 catalogs are intercepted by `DataSourceV2Strategy`
before the
+ // `ExecutedCommandExec(RunnableCommand)` fallback, so this branch is
unreachable in
+ // practice. Keep the assertion for defensive clarity if a future
strategy reorder
+ // accidentally exposes this code path.
+ throw SparkException.internalError(
+ "V2 metric-view CREATE should be handled by DataSourceV2Strategy,
not run here")
Review Comment:
Is this necessary? If not, please remove it
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException,
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog,
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants,
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory,
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+ import testImplicits._
+
+ private val testCatalogName = "testcat"
+ private val testNamespace = "ns"
+ private val sourceTableName = "events"
+ private val fullSourceTableName =
+ s"$testCatalogName.$testNamespace.$sourceTableName"
+ private val metricViewName = "mv"
+ private val fullMetricViewName =
+ s"$testCatalogName.$testNamespace.$metricViewName"
+
+ private val metricViewColumns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+ private val testTableData = Seq(
+ ("region_1", 1, 5.0),
+ ("region_2", 2, 10.0))
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(
+ s"spark.sql.catalog.$testCatalogName",
+ classOf[MetricViewRecordingCatalog].getName)
+ // A catalog that does not implement ViewCatalog - used for the negative
gate test.
+ spark.conf.set(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+ classOf[InMemoryTableCatalog].getName)
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+ spark.conf.unset(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+ super.afterAll()
+ }
+
+ private def withTestCatalogTables(body: => Unit): Unit = {
+ MetricViewRecordingCatalog.reset()
+ testTableData.toDF("region", "count", "price")
+ .createOrReplaceTempView("metric_view_v2_source")
+ try {
+ sql(
+ s"""CREATE TABLE $fullSourceTableName
+ |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+ body
+ } finally {
+ sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+ sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+ spark.catalog.dropTempView("metric_view_v2_source")
+ MetricViewRecordingCatalog.reset()
+ }
+ }
+
+ private def createMetricView(
+ name: String,
+ metricView: MetricView,
+ comment: Option[String] = None): String = {
+ val yaml = MetricViewFactory.toYAML(metricView)
+ val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+ sql(
+ s"""CREATE VIEW $name
+ |WITH METRICS$commentClause
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ yaml
+ }
+
+ private def capturedViewInfo(): ViewInfo = {
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+ val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+ assert(info != null,
+ s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+ info
+ }
+
+ test("V2 catalog receives METRIC_VIEW table type and view text via
ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s
constructor stamps it
+ // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the
round-tripped row
+ // back to `CatalogTableType.METRIC_VIEW`.
+ assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(info.queryText() === yaml)
+
+ val deps = info.viewDependencies()
+ assert(deps != null)
+ assert(deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("V2 catalog path populates metric_view.* + view context + sql configs
on ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+
+ // metric_view.* descriptive properties (mirrors DBR
SingleSourceMetricView).
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+ assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+ assert(props.get(MetricView.PROP_FROM_SQL) === null)
+ assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+ // SQL configs and current catalog/namespace are first-class typed
fields on ViewInfo, no
+ // longer encoded into properties for V2 catalogs.
+ assert(info.sqlConfigs().size > 0,
+ s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+ assert(info.currentCatalog() ===
+ spark.sessionState.catalogManager.currentCatalog.name())
+ assert(info.currentNamespace().toSeq ===
+ spark.sessionState.catalogManager.currentNamespace.toSeq)
+ }
+ }
+
+ test("DROP VIEW succeeds on a V2 metric view") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+ assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+ sql(s"DROP VIEW $fullMetricViewName")
+ assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+ }
+ }
+
+ test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+ withTestCatalogTables {
+ sql(s"DROP VIEW IF EXISTS
$testCatalogName.$testNamespace.does_not_exist")
+ }
+ }
+
+ test("V2 catalog path captures SQL source and comment") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(s"SELECT * FROM $fullSourceTableName"),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+ assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+ assert(props.get(MetricView.PROP_FROM_NAME) === null)
+ assert(props.get(MetricView.PROP_FROM_SQL) ===
+ s"SELECT * FROM $fullSourceTableName")
+ assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+ val deps = info.viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("metric view columns carry metric_view.type / metric_view.expr in
column metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val cols = capturedViewInfo().columns()
+ assert(cols.length === metricViewColumns.length)
+
+ val byName = cols.map(c => c.name() -> c).toMap
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regionMeta = metadataOf("region")
+ assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+
+ val countMeta = metadataOf("count_sum")
+ assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"measure")
+ assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ }
+ }
+
+ test("user-specified column names with comments preserve metric_view.*
metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(metricView)
+ // Give both columns new names, and a comment on each. Without the
`retainMetadata`
+ // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+ sql(
+ s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n
COMMENT 'count')
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+
+ val cols = capturedViewInfo().columns()
+ val byName = cols.map(c => c.name() -> c).toMap
+ assert(byName.keySet === Set("reg", "n"))
+
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regMeta = metadataOf("reg")
+ assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+ // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into
`Column.comment()`
+ // rather than leaving it inside `metadataInJSON`; assert via the V2
column accessor.
+ assert(byName("reg").comment() === "region alias")
+
+ val nMeta = metadataOf("n")
+ assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+ assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ assert(byName("n").comment() === "count")
+ }
+ }
+
+ test("dependency extraction: SQL source JOIN captures both tables") {
+ withTestCatalogTables {
+ val secondSource = s"$testCatalogName.$testNamespace.customers"
+ sql(
+ s"""CREATE TABLE $secondSource (id INT, name STRING)
+ |USING foo""".stripMargin)
+ try {
+ val joinSql =
+ s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+ s"JOIN $secondSource c ON t.count = c.id"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(joinSql),
+ where = None,
+ select = Seq(
+ Column("name", DimensionExpression("name"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null)
+ val depParts =
+
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+ assert(depParts === Set(
+ Seq(testCatalogName, testNamespace, sourceTableName),
+ Seq(testCatalogName, testNamespace, "customers")),
+ s"Expected dependencies on both source tables, got $depParts")
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $secondSource")
+ }
+ }
+ }
+
+ test("dependency extraction: SQL source subquery deduplicates same-table
references") {
+ withTestCatalogTables {
+ val subquerySql =
+ s"SELECT * FROM $fullSourceTableName " +
+ s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(subquerySql),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("dependency extraction: SQL source self-join deduplicates same-table
references") {
+ withTestCatalogTables {
+ val selfJoinSql =
+ s"SELECT a.region AS a_region, a.count AS a_count " +
+ s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+ s"ON a.region = b.region"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(selfJoinSql),
+ where = None,
+ select = Seq(
+ Column("region", DimensionExpression("a_region"), 0),
+ Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency for self-join, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric
view") {
+ withTestCatalogTables {
+ val first = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, first)
+ val firstYaml = capturedViewInfo().queryText()
+
+ // Replace with a new body (different WHERE clause).
+ val replacement = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 100"),
+ select = metricViewColumns)
+ val replacementYaml = MetricViewFactory.toYAML(replacement)
+ sql(
+ s"""CREATE OR REPLACE VIEW $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$replacementYaml
+ |$$$$""".stripMargin)
+
+ val finalInfo = capturedViewInfo()
+ assert(finalInfo.queryText() === replacementYaml,
+ "OR REPLACE should swap the captured ViewInfo's queryText.")
+ assert(finalInfo.queryText() !== firstYaml,
+ "OR REPLACE should not leave the original captured queryText in
place.")
+ }
+ }
+
+ test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view
exists") {
+ withTestCatalogTables {
+ val original = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, original)
+ val originalYaml = capturedViewInfo().queryText()
+
+ // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog
should not see
+ // the second create at all (V2ViewPreparation's `viewExists`
short-circuit fires before
+ // `buildViewInfo`), so the captured ViewInfo retains the original body.
+ val replacement = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 999"),
+ select = metricViewColumns)
+ val replacementYaml = MetricViewFactory.toYAML(replacement)
+ sql(
+ s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$replacementYaml
+ |$$$$""".stripMargin)
+
+ assert(capturedViewInfo().queryText() === originalYaml,
+ "IF NOT EXISTS over an existing metric view should be a no-op.")
+ }
+ }
+
+ test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " +
+ "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") {
+ withTestCatalogTables {
+ // Pre-create a regular v2 table at the same ident the metric view will
target. The
+ // catalog's `createView` call below should raise
`ViewAlreadyExistsException`, which
+ // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the
precise cross-type
+ // collision error that `CreateV2ViewExec` emits.
+ sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+
+ val mv = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(mv)
+ val ex = intercept[AnalysisException] {
+ sql(
+ s"""CREATE VIEW $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ }
+ // CreateV2ViewExec / CreateV2MetricViewExec route this through
+ // `unsupportedCreateOrReplaceViewOnTableError` which maps to
+ // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE`.
+ assert(ex.getCondition === "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE",
+ s"Expected EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE, got
${ex.getCondition}: ${ex.getMessage}")
+ }
+ }
+
+ test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table
sits at the " +
+ "ident") {
+ withTestCatalogTables {
+ sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+ val mv = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(mv)
+ // IF NOT EXISTS over a table is a no-op (v1 parity), not an error.
+ sql(
+ s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+ assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident),
+ "IF NOT EXISTS over a v2 table should not register a view in the
catalog.")
+ }
+ }
+
+ test("dependency extraction: V1 session-catalog source emits 3-part
nameParts") {
+ val v1Source = "metric_view_v2_v1source"
+ spark.range(0, 5).toDF("v")
+ .write.mode("overwrite").saveAsTable(v1Source)
+ try {
+ withTestCatalogTables {
+ val mv = MetricView(
+ "0.1",
+ // SQL source resolves through the current (session) catalog; the
resolved
+ // `LogicalRelation` carries a session-catalog `CatalogTable`.
+ SQLSource(s"SELECT v AS region, v AS count FROM $v1Source"),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, mv)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1)
+ val parts =
deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq
+ // For a session-catalog source, `TableIdentifier.nameParts` includes
catalog + db +
+ // table when the catalog is set; here we expect at least 2 parts
(`db.table`) and
+ // up to 3 (`spark_catalog.db.table`) -- both are valid producer
outputs depending
+ // on whether the analyzer captured the session-catalog component.
+ assert(parts.last === v1Source, s"Last part should be the table name,
got $parts")
+ assert(parts.length >= 2 && parts.length <= 3,
+ s"V1 nameParts arity should be 2 or 3, got ${parts.length}: $parts")
+ }
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $v1Source")
+ }
+ }
+
+ test("dependency extraction: multi-level V2 namespace source emits N+2
nameParts") {
+ val multiNamespace = Array("ns_a", "ns_b")
+ val multiTable = "events_deep"
+ val multiFull =
s"$testCatalogName.${multiNamespace.mkString(".")}.$multiTable"
+ withTestCatalogTables {
+ // The InMemoryTableCatalog (RelationCatalog mixin) supports multi-level
namespaces.
+ sql(s"CREATE NAMESPACE IF NOT EXISTS
$testCatalogName.${multiNamespace.head}")
+ sql(s"CREATE NAMESPACE IF NOT EXISTS " +
+ s"$testCatalogName.${multiNamespace.mkString(".")}")
+ sql(s"CREATE TABLE $multiFull (region STRING, count INT) USING foo")
+ try {
+ val mv = MetricView(
+ "0.1",
+ SQLSource(s"SELECT region, count FROM $multiFull"),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, mv)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1)
+ val parts =
deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq
+ assert(parts === Seq(testCatalogName, multiNamespace(0),
multiNamespace(1), multiTable),
+ s"Multi-level nameParts should preserve every namespace component,
got $parts")
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $multiFull")
+ sql(s"DROP NAMESPACE IF EXISTS " +
+ s"$testCatalogName.${multiNamespace.mkString(".")} CASCADE")
+ sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${multiNamespace.head}
CASCADE")
+ }
+ }
+ }
+
+ test("DESCRIBE TABLE EXTENDED on a v2 metric view round-trips through
loadRelation") {
Review Comment:
Let's also test that the normal describe can return correct result for
metric views
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException,
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog,
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants,
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory,
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+ import testImplicits._
+
+ private val testCatalogName = "testcat"
+ private val testNamespace = "ns"
+ private val sourceTableName = "events"
+ private val fullSourceTableName =
+ s"$testCatalogName.$testNamespace.$sourceTableName"
+ private val metricViewName = "mv"
+ private val fullMetricViewName =
+ s"$testCatalogName.$testNamespace.$metricViewName"
+
+ private val metricViewColumns = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+ private val testTableData = Seq(
+ ("region_1", 1, 5.0),
+ ("region_2", 2, 10.0))
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(
+ s"spark.sql.catalog.$testCatalogName",
+ classOf[MetricViewRecordingCatalog].getName)
+ // A catalog that does not implement ViewCatalog - used for the negative
gate test.
+ spark.conf.set(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+ classOf[InMemoryTableCatalog].getName)
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+ spark.conf.unset(
+ s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+ super.afterAll()
+ }
+
+ private def withTestCatalogTables(body: => Unit): Unit = {
+ MetricViewRecordingCatalog.reset()
+ testTableData.toDF("region", "count", "price")
+ .createOrReplaceTempView("metric_view_v2_source")
+ try {
+ sql(
+ s"""CREATE TABLE $fullSourceTableName
+ |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+ body
+ } finally {
+ sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+ sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+ spark.catalog.dropTempView("metric_view_v2_source")
+ MetricViewRecordingCatalog.reset()
+ }
+ }
+
+ private def createMetricView(
+ name: String,
+ metricView: MetricView,
+ comment: Option[String] = None): String = {
+ val yaml = MetricViewFactory.toYAML(metricView)
+ val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+ sql(
+ s"""CREATE VIEW $name
+ |WITH METRICS$commentClause
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+ yaml
+ }
+
+ private def capturedViewInfo(): ViewInfo = {
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+ val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+ assert(info != null,
+ s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+ info
+ }
+
+ test("V2 catalog receives METRIC_VIEW table type and view text via
ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s
constructor stamps it
+ // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the
round-tripped row
+ // back to `CatalogTableType.METRIC_VIEW`.
+ assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(info.queryText() === yaml)
+
+ val deps = info.viewDependencies()
+ assert(deps != null)
+ assert(deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("V2 catalog path populates metric_view.* + view context + sql configs
on ViewInfo") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+
+ // metric_view.* descriptive properties (mirrors DBR
SingleSourceMetricView).
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+ assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+ assert(props.get(MetricView.PROP_FROM_SQL) === null)
+ assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+ // SQL configs and current catalog/namespace are first-class typed
fields on ViewInfo, no
+ // longer encoded into properties for V2 catalogs.
+ assert(info.sqlConfigs().size > 0,
+ s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+ assert(info.currentCatalog() ===
+ spark.sessionState.catalogManager.currentCatalog.name())
+ assert(info.currentNamespace().toSeq ===
+ spark.sessionState.catalogManager.currentNamespace.toSeq)
+ }
+ }
+
+ test("DROP VIEW succeeds on a V2 metric view") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+ val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+ assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+ sql(s"DROP VIEW $fullMetricViewName")
+ assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+ }
+ }
+
+ test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+ withTestCatalogTables {
+ sql(s"DROP VIEW IF EXISTS
$testCatalogName.$testNamespace.does_not_exist")
+ }
+ }
+
+ test("V2 catalog path captures SQL source and comment") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(s"SELECT * FROM $fullSourceTableName"),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+ val info = capturedViewInfo()
+ val props = info.properties()
+ assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+ === TableSummary.METRIC_VIEW_TABLE_TYPE)
+ assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+ assert(props.get(MetricView.PROP_FROM_NAME) === null)
+ assert(props.get(MetricView.PROP_FROM_SQL) ===
+ s"SELECT * FROM $fullSourceTableName")
+ assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+ val deps = info.viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1)
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("metric view columns carry metric_view.type / metric_view.expr in
column metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val cols = capturedViewInfo().columns()
+ assert(cols.length === metricViewColumns.length)
+
+ val byName = cols.map(c => c.name() -> c).toMap
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regionMeta = metadataOf("region")
+ assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+
+ val countMeta = metadataOf("count_sum")
+ assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"measure")
+ assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ }
+ }
+
+ test("user-specified column names with comments preserve metric_view.*
metadata") {
+ withTestCatalogTables {
+ val metricView = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = None,
+ select = metricViewColumns)
+ val yaml = MetricViewFactory.toYAML(metricView)
+ // Give both columns new names, and a comment on each. Without the
`retainMetadata`
+ // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+ sql(
+ s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n
COMMENT 'count')
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$yaml
+ |$$$$""".stripMargin)
+
+ val cols = capturedViewInfo().columns()
+ val byName = cols.map(c => c.name() -> c).toMap
+ assert(byName.keySet === Set("reg", "n"))
+
+ def metadataOf(name: String): Metadata =
+
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+ val regMeta = metadataOf("reg")
+ assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) ===
"dimension")
+ assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"region")
+ // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into
`Column.comment()`
+ // rather than leaving it inside `metadataInJSON`; assert via the V2
column accessor.
+ assert(byName("reg").comment() === "region alias")
+
+ val nMeta = metadataOf("n")
+ assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+ assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) ===
"sum(count)")
+ assert(byName("n").comment() === "count")
+ }
+ }
+
+ test("dependency extraction: SQL source JOIN captures both tables") {
+ withTestCatalogTables {
+ val secondSource = s"$testCatalogName.$testNamespace.customers"
+ sql(
+ s"""CREATE TABLE $secondSource (id INT, name STRING)
+ |USING foo""".stripMargin)
+ try {
+ val joinSql =
+ s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+ s"JOIN $secondSource c ON t.count = c.id"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(joinSql),
+ where = None,
+ select = Seq(
+ Column("name", DimensionExpression("name"), 0),
+ Column("count_sum", MeasureExpression("sum(count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null)
+ val depParts =
+
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+ assert(depParts === Set(
+ Seq(testCatalogName, testNamespace, sourceTableName),
+ Seq(testCatalogName, testNamespace, "customers")),
+ s"Expected dependencies on both source tables, got $depParts")
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $secondSource")
+ }
+ }
+ }
+
+ test("dependency extraction: SQL source subquery deduplicates same-table
references") {
+ withTestCatalogTables {
+ val subquerySql =
+ s"SELECT * FROM $fullSourceTableName " +
+ s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(subquerySql),
+ where = None,
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("dependency extraction: SQL source self-join deduplicates same-table
references") {
+ withTestCatalogTables {
+ val selfJoinSql =
+ s"SELECT a.region AS a_region, a.count AS a_count " +
+ s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+ s"ON a.region = b.region"
+ val metricView = MetricView(
+ "0.1",
+ SQLSource(selfJoinSql),
+ where = None,
+ select = Seq(
+ Column("region", DimensionExpression("a_region"), 0),
+ Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+ createMetricView(fullMetricViewName, metricView)
+
+ val deps = capturedViewInfo().viewDependencies()
+ assert(deps != null && deps.dependencies().length === 1,
+ s"Expected 1 deduplicated dependency for self-join, got " +
+ s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+ val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+ assert(tableDep.nameParts().toSeq ===
+ Seq(testCatalogName, testNamespace, sourceTableName))
+ }
+ }
+
+ test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric
view") {
+ withTestCatalogTables {
+ val first = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 0"),
+ select = metricViewColumns)
+ createMetricView(fullMetricViewName, first)
+ val firstYaml = capturedViewInfo().queryText()
+
+ // Replace with a new body (different WHERE clause).
+ val replacement = MetricView(
+ "0.1",
+ AssetSource(fullSourceTableName),
+ where = Some("count > 100"),
+ select = metricViewColumns)
+ val replacementYaml = MetricViewFactory.toYAML(replacement)
+ sql(
+ s"""CREATE OR REPLACE VIEW $fullMetricViewName
+ |WITH METRICS
+ |LANGUAGE YAML
+ |AS
+ |$$$$
+ |$replacementYaml
+ |$$$$""".stripMargin)
+
+ val finalInfo = capturedViewInfo()
+ assert(finalInfo.queryText() === replacementYaml,
+ "OR REPLACE should swap the captured ViewInfo's queryText.")
+ assert(finalInfo.queryText() !== firstYaml,
+ "OR REPLACE should not leave the original captured queryText in
place.")
Review Comment:
This is kind of unnecessary. I would rather you assert on the other fields
of the final info to match with the metric view we want to replace with
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]