chenwang-databricks commented on code in PR #55487:
URL: https://github.com/apache/spark/pull/55487#discussion_r3162953087


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##########
@@ -57,6 +57,21 @@ private[v2] trait V2ViewPreparation extends 
LeafV2CommandExec {
   protected lazy val fullNameParts: Seq[String] =
     (catalog.name() +: identifier.asMultipartIdentifier).toSeq
 
+  /**
+   * Optional structured dependency list to stamp on the built `ViewInfo`. 
Plain views leave

Review Comment:
   Please just keep "Optional structured dependency list to stamp on the built 
`ViewInfo`." - Don't have to mention the difference between plain views and 
metric views. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala:
##########
@@ -57,6 +57,21 @@ private[v2] trait V2ViewPreparation extends 
LeafV2CommandExec {
   protected lazy val fullNameParts: Seq[String] =
     (catalog.name() +: identifier.asMultipartIdentifier).toSeq
 
+  /**
+   * Optional structured dependency list to stamp on the built `ViewInfo`. 
Plain views leave
+   * this `None`; metric views populate it with the source-table lineage 
produced by
+   * `MetricViewHelper.collectTableDependencies`.
+   */
+  protected def viewDependencies: Option[DependencyList] = None
+
+  /**
+   * Optional view sub-kind to stamp on `PROP_TABLE_TYPE`. Plain views leave 
this `None` so

Review Comment:
   Same, keep the description short. Don't have to mention plain views or 
metric views. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala:
##########
@@ -1233,7 +1236,11 @@ case class ShowCreateTableCommand(
 
       val builder = new StringBuilder
 
-      val stmt = if (tableMetadata.tableType == VIEW) {
+      // SHOW CREATE TABLE on a metric view falls through to the VIEW branch, 
which emits
+      // `CREATE VIEW ...` without the `WITH METRICS` qualifier. The output is 
not yet
+      // round-trippable for metric views; tracked as follow-up.
+      val stmt = if (tableMetadata.tableType == VIEW ||
+          tableMetadata.tableType == METRIC_VIEW) {

Review Comment:
   For SHOW CREATE TABLE - let's make it explicitly not supported for Metric 
Views for now



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, 
SchemaUnsupported, ViewAlreadyExistsException, ViewSchemaMode}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{DependencyList, Identifier, 
TableCatalog, TableSummary, ViewCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.CommandUtils
+
+/**
+ * Physical plan node for `CREATE VIEW ... WITH METRICS` on a v2 
[[ViewCatalog]]. Mirrors
+ * [[CreateV2ViewExec]]'s flag handling and cross-type collision decoding via 
the shared
+ * [[V2ViewPreparation]] trait, and adds metric-view-specific bits (typed
+ * [[DependencyList]] view-dependencies and `PROP_TABLE_TYPE = METRIC_VIEW`) 
through the
+ * trait's optional hooks.
+ *
+ * Routed by [[DataSourceV2Strategy]] from
+ * [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] when the 
resolved
+ * catalog is a non-session v2 catalog.
+ */
+case class CreateV2MetricViewExec(
+    catalog: ViewCatalog,
+    identifier: Identifier,
+    userSpecifiedColumns: Seq[(String, Option[String])],
+    comment: Option[String],
+    userProperties: Map[String, String],
+    originalText: String,
+    query: LogicalPlan,
+    allowExisting: Boolean,
+    replace: Boolean,
+    deps: Option[DependencyList]) extends V2ViewPreparation {
+
+  // Metric views don't carry a default-collation override.
+  override def collation: Option[String] = None
+
+  // CREATE stamps the current user, matching the v1 metric-view path (which 
goes through
+  // ViewHelper.prepareTable -> CatalogTable.owner default) and 
CreateV2ViewExec.
+  override def owner: Option[String] = Some(CurrentUserContext.getCurrentUser)
+
+  // Metric views always have schema-mode UNSUPPORTED (mirroring the v1 path 
which passes
+  // SchemaUnsupported into ViewHelper.prepareTable).
+  override def viewSchemaMode: ViewSchemaMode = SchemaUnsupported
+
+  override protected def viewDependencies: Option[DependencyList] = deps
+
+  override protected def tableType: Option[String] =
+    Some(TableSummary.METRIC_VIEW_TABLE_TYPE)
+
+  override protected def run(): Seq[InternalRow] = {

Review Comment:
   If this is just mirroring CreateV2ViewExec, why would you need to define it 
again? 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog, 
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {

Review Comment:
   We are still lacking the Read Back unit tests. Please create the metric view 
and use SELECT to query the result back. 
   
   Also, I think we should create test cases for show tables and show views as 
well. 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog, 
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  private val testCatalogName = "testcat"
+  private val testNamespace = "ns"
+  private val sourceTableName = "events"
+  private val fullSourceTableName =
+    s"$testCatalogName.$testNamespace.$sourceTableName"
+  private val metricViewName = "mv"
+  private val fullMetricViewName =
+    s"$testCatalogName.$testNamespace.$metricViewName"
+
+  private val metricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+  private val testTableData = Seq(
+    ("region_1", 1, 5.0),
+    ("region_2", 2, 10.0))
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      s"spark.sql.catalog.$testCatalogName",
+      classOf[MetricViewRecordingCatalog].getName)
+    // A catalog that does not implement ViewCatalog - used for the negative 
gate test.
+    spark.conf.set(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+      classOf[InMemoryTableCatalog].getName)
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+    spark.conf.unset(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+    super.afterAll()
+  }
+
+  private def withTestCatalogTables(body: => Unit): Unit = {
+    MetricViewRecordingCatalog.reset()
+    testTableData.toDF("region", "count", "price")
+      .createOrReplaceTempView("metric_view_v2_source")
+    try {
+      sql(
+        s"""CREATE TABLE $fullSourceTableName
+           |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+      body
+    } finally {
+      sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+      sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+      spark.catalog.dropTempView("metric_view_v2_source")
+      MetricViewRecordingCatalog.reset()
+    }
+  }
+
+  private def createMetricView(
+      name: String,
+      metricView: MetricView,
+      comment: Option[String] = None): String = {
+    val yaml = MetricViewFactory.toYAML(metricView)
+    val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+    sql(
+      s"""CREATE VIEW $name
+         |WITH METRICS$commentClause
+         |LANGUAGE YAML
+         |AS
+         |$$$$
+         |$yaml
+         |$$$$""".stripMargin)
+    yaml
+  }
+
+  private def capturedViewInfo(): ViewInfo = {
+    val ident = Identifier.of(Array(testNamespace), metricViewName)
+    val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+    assert(info != null,
+      s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+    info
+  }
+
+  test("V2 catalog receives METRIC_VIEW table type and view text via 
ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s 
constructor stamps it
+      // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the 
round-tripped row
+      // back to `CatalogTableType.METRIC_VIEW`.
+      assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(info.queryText() === yaml)
+
+      val deps = info.viewDependencies()
+      assert(deps != null)
+      assert(deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("V2 catalog path populates metric_view.* + view context + sql configs 
on ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+
+      // metric_view.* descriptive properties (mirrors DBR 
SingleSourceMetricView).
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+      assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+      assert(props.get(MetricView.PROP_FROM_SQL) === null)
+      assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+      // SQL configs and current catalog/namespace are first-class typed 
fields on ViewInfo, no
+      // longer encoded into properties for V2 catalogs.
+      assert(info.sqlConfigs().size > 0,
+        s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+      assert(info.currentCatalog() ===
+        spark.sessionState.catalogManager.currentCatalog.name())
+      assert(info.currentNamespace().toSeq ===
+        spark.sessionState.catalogManager.currentNamespace.toSeq)
+    }
+  }
+
+  test("DROP VIEW succeeds on a V2 metric view") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+      assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+      sql(s"DROP VIEW $fullMetricViewName")
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+    }
+  }
+
+  test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+    withTestCatalogTables {
+      sql(s"DROP VIEW IF EXISTS 
$testCatalogName.$testNamespace.does_not_exist")
+    }
+  }
+
+  test("V2 catalog path captures SQL source and comment") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(s"SELECT * FROM $fullSourceTableName"),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+      assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+      assert(props.get(MetricView.PROP_FROM_NAME) === null)
+      assert(props.get(MetricView.PROP_FROM_SQL) ===
+        s"SELECT * FROM $fullSourceTableName")
+      assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+      val deps = info.viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("metric view columns carry metric_view.type / metric_view.expr in 
column metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val cols = capturedViewInfo().columns()
+      assert(cols.length === metricViewColumns.length)
+
+      val byName = cols.map(c => c.name() -> c).toMap
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regionMeta = metadataOf("region")
+      assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+
+      val countMeta = metadataOf("count_sum")
+      assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"measure")
+      assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+    }
+  }
+
+  test("user-specified column names with comments preserve metric_view.* 
metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(metricView)
+      // Give both columns new names, and a comment on each. Without the 
`retainMetadata`
+      // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+      sql(
+        s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n 
COMMENT 'count')
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+
+      val cols = capturedViewInfo().columns()
+      val byName = cols.map(c => c.name() -> c).toMap
+      assert(byName.keySet === Set("reg", "n"))
+
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regMeta = metadataOf("reg")
+      assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+      // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into 
`Column.comment()`
+      // rather than leaving it inside `metadataInJSON`; assert via the V2 
column accessor.
+      assert(byName("reg").comment() === "region alias")
+
+      val nMeta = metadataOf("n")
+      assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+      assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+      assert(byName("n").comment() === "count")
+    }
+  }
+
+  test("dependency extraction: SQL source JOIN captures both tables") {
+    withTestCatalogTables {
+      val secondSource = s"$testCatalogName.$testNamespace.customers"
+      sql(
+        s"""CREATE TABLE $secondSource (id INT, name STRING)
+           |USING foo""".stripMargin)
+      try {
+        val joinSql =
+          s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+            s"JOIN $secondSource c ON t.count = c.id"
+        val metricView = MetricView(
+          "0.1",
+          SQLSource(joinSql),
+          where = None,
+          select = Seq(
+            Column("name", DimensionExpression("name"), 0),
+            Column("count_sum", MeasureExpression("sum(count)"), 1)))
+        createMetricView(fullMetricViewName, metricView)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null)
+        val depParts =
+          
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+        assert(depParts === Set(
+          Seq(testCatalogName, testNamespace, sourceTableName),
+          Seq(testCatalogName, testNamespace, "customers")),
+          s"Expected dependencies on both source tables, got $depParts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $secondSource")
+      }
+    }
+  }
+
+  test("dependency extraction: SQL source subquery deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val subquerySql =
+        s"SELECT * FROM $fullSourceTableName " +
+          s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(subquerySql),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("dependency extraction: SQL source self-join deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val selfJoinSql =
+        s"SELECT a.region AS a_region, a.count AS a_count " +
+          s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+          s"ON a.region = b.region"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(selfJoinSql),
+        where = None,
+        select = Seq(
+          Column("region", DimensionExpression("a_region"), 0),
+          Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency for self-join, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric 
view") {
+    withTestCatalogTables {
+      val first = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, first)
+      val firstYaml = capturedViewInfo().queryText()
+
+      // Replace with a new body (different WHERE clause).
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 100"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE OR REPLACE VIEW $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      val finalInfo = capturedViewInfo()
+      assert(finalInfo.queryText() === replacementYaml,
+        "OR REPLACE should swap the captured ViewInfo's queryText.")
+      assert(finalInfo.queryText() !== firstYaml,
+        "OR REPLACE should not leave the original captured queryText in 
place.")
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view 
exists") {
+    withTestCatalogTables {
+      val original = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, original)
+      val originalYaml = capturedViewInfo().queryText()
+
+      // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog 
should not see
+      // the second create at all (V2ViewPreparation's `viewExists` 
short-circuit fires before
+      // `buildViewInfo`), so the captured ViewInfo retains the original body.
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 999"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      assert(capturedViewInfo().queryText() === originalYaml,
+        "IF NOT EXISTS over an existing metric view should be a no-op.")
+    }
+  }
+
+  test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " +
+      "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") {
+    withTestCatalogTables {
+      // Pre-create a regular v2 table at the same ident the metric view will 
target. The
+      // catalog's `createView` call below should raise 
`ViewAlreadyExistsException`, which
+      // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the 
precise cross-type
+      // collision error that `CreateV2ViewExec` emits.
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      val ex = intercept[AnalysisException] {
+        sql(
+          s"""CREATE VIEW $fullMetricViewName
+             |WITH METRICS
+             |LANGUAGE YAML
+             |AS
+             |$$$$
+             |$yaml
+             |$$$$""".stripMargin)
+      }
+      // CreateV2ViewExec / CreateV2MetricViewExec route this through
+      // `unsupportedCreateOrReplaceViewOnTableError` which maps to
+      // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE`.
+      assert(ex.getCondition === "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE",
+        s"Expected EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE, got 
${ex.getCondition}: ${ex.getMessage}")
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table 
sits at the " +
+      "ident") {
+    withTestCatalogTables {
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      // IF NOT EXISTS over a table is a no-op (v1 parity), not an error.
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident),
+        "IF NOT EXISTS over a v2 table should not register a view in the 
catalog.")
+    }
+  }
+
+  test("dependency extraction: V1 session-catalog source emits 3-part 
nameParts") {

Review Comment:
   Please re-arrange the order of all the test cases:
   1. Create related tests
   2. Dependency extraction
   3. Select cases
   4. Described cases



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala:
##########
@@ -39,15 +43,22 @@ case class CreateMetricViewCommand(
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val catalog = sparkSession.sessionState.catalog
-    val name = child match {
-      case v: ResolvedIdentifier =>
-        v.identifier.asTableIdentifier
+    child match {
+      case v: ResolvedIdentifier if CatalogV2Util.isSessionCatalog(v.catalog) 
=>
+        createMetricViewInSessionCatalog(sparkSession, v)
+      case _: ResolvedIdentifier =>
+        // Non-session v2 catalogs are intercepted by `DataSourceV2Strategy` 
before the
+        // `ExecutedCommandExec(RunnableCommand)` fallback, so this branch is 
unreachable in
+        // practice. Keep the assertion for defensive clarity if a future 
strategy reorder
+        // accidentally exposes this code path.
+        throw SparkException.internalError(
+          "V2 metric-view CREATE should be handled by DataSourceV2Strategy, 
not run here")

Review Comment:
   Is this necessary? If not, please remove it



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog, 
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  private val testCatalogName = "testcat"
+  private val testNamespace = "ns"
+  private val sourceTableName = "events"
+  private val fullSourceTableName =
+    s"$testCatalogName.$testNamespace.$sourceTableName"
+  private val metricViewName = "mv"
+  private val fullMetricViewName =
+    s"$testCatalogName.$testNamespace.$metricViewName"
+
+  private val metricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+  private val testTableData = Seq(
+    ("region_1", 1, 5.0),
+    ("region_2", 2, 10.0))
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      s"spark.sql.catalog.$testCatalogName",
+      classOf[MetricViewRecordingCatalog].getName)
+    // A catalog that does not implement ViewCatalog - used for the negative 
gate test.
+    spark.conf.set(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+      classOf[InMemoryTableCatalog].getName)
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+    spark.conf.unset(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+    super.afterAll()
+  }
+
+  private def withTestCatalogTables(body: => Unit): Unit = {
+    MetricViewRecordingCatalog.reset()
+    testTableData.toDF("region", "count", "price")
+      .createOrReplaceTempView("metric_view_v2_source")
+    try {
+      sql(
+        s"""CREATE TABLE $fullSourceTableName
+           |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+      body
+    } finally {
+      sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+      sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+      spark.catalog.dropTempView("metric_view_v2_source")
+      MetricViewRecordingCatalog.reset()
+    }
+  }
+
+  private def createMetricView(
+      name: String,
+      metricView: MetricView,
+      comment: Option[String] = None): String = {
+    val yaml = MetricViewFactory.toYAML(metricView)
+    val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+    sql(
+      s"""CREATE VIEW $name
+         |WITH METRICS$commentClause
+         |LANGUAGE YAML
+         |AS
+         |$$$$
+         |$yaml
+         |$$$$""".stripMargin)
+    yaml
+  }
+
+  private def capturedViewInfo(): ViewInfo = {
+    val ident = Identifier.of(Array(testNamespace), metricViewName)
+    val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+    assert(info != null,
+      s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+    info
+  }
+
+  test("V2 catalog receives METRIC_VIEW table type and view text via 
ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s 
constructor stamps it
+      // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the 
round-tripped row
+      // back to `CatalogTableType.METRIC_VIEW`.
+      assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(info.queryText() === yaml)
+
+      val deps = info.viewDependencies()
+      assert(deps != null)
+      assert(deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("V2 catalog path populates metric_view.* + view context + sql configs 
on ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+
+      // metric_view.* descriptive properties (mirrors DBR 
SingleSourceMetricView).
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+      assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+      assert(props.get(MetricView.PROP_FROM_SQL) === null)
+      assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+      // SQL configs and current catalog/namespace are first-class typed 
fields on ViewInfo, no
+      // longer encoded into properties for V2 catalogs.
+      assert(info.sqlConfigs().size > 0,
+        s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+      assert(info.currentCatalog() ===
+        spark.sessionState.catalogManager.currentCatalog.name())
+      assert(info.currentNamespace().toSeq ===
+        spark.sessionState.catalogManager.currentNamespace.toSeq)
+    }
+  }
+
+  test("DROP VIEW succeeds on a V2 metric view") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+      assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+      sql(s"DROP VIEW $fullMetricViewName")
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+    }
+  }
+
+  test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+    withTestCatalogTables {
+      sql(s"DROP VIEW IF EXISTS 
$testCatalogName.$testNamespace.does_not_exist")
+    }
+  }
+
+  test("V2 catalog path captures SQL source and comment") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(s"SELECT * FROM $fullSourceTableName"),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+      assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+      assert(props.get(MetricView.PROP_FROM_NAME) === null)
+      assert(props.get(MetricView.PROP_FROM_SQL) ===
+        s"SELECT * FROM $fullSourceTableName")
+      assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+      val deps = info.viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("metric view columns carry metric_view.type / metric_view.expr in 
column metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val cols = capturedViewInfo().columns()
+      assert(cols.length === metricViewColumns.length)
+
+      val byName = cols.map(c => c.name() -> c).toMap
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regionMeta = metadataOf("region")
+      assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+
+      val countMeta = metadataOf("count_sum")
+      assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"measure")
+      assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+    }
+  }
+
+  test("user-specified column names with comments preserve metric_view.* 
metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(metricView)
+      // Give both columns new names, and a comment on each. Without the 
`retainMetadata`
+      // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+      sql(
+        s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n 
COMMENT 'count')
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+
+      val cols = capturedViewInfo().columns()
+      val byName = cols.map(c => c.name() -> c).toMap
+      assert(byName.keySet === Set("reg", "n"))
+
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regMeta = metadataOf("reg")
+      assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+      // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into 
`Column.comment()`
+      // rather than leaving it inside `metadataInJSON`; assert via the V2 
column accessor.
+      assert(byName("reg").comment() === "region alias")
+
+      val nMeta = metadataOf("n")
+      assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+      assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+      assert(byName("n").comment() === "count")
+    }
+  }
+
+  test("dependency extraction: SQL source JOIN captures both tables") {
+    withTestCatalogTables {
+      val secondSource = s"$testCatalogName.$testNamespace.customers"
+      sql(
+        s"""CREATE TABLE $secondSource (id INT, name STRING)
+           |USING foo""".stripMargin)
+      try {
+        val joinSql =
+          s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+            s"JOIN $secondSource c ON t.count = c.id"
+        val metricView = MetricView(
+          "0.1",
+          SQLSource(joinSql),
+          where = None,
+          select = Seq(
+            Column("name", DimensionExpression("name"), 0),
+            Column("count_sum", MeasureExpression("sum(count)"), 1)))
+        createMetricView(fullMetricViewName, metricView)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null)
+        val depParts =
+          
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+        assert(depParts === Set(
+          Seq(testCatalogName, testNamespace, sourceTableName),
+          Seq(testCatalogName, testNamespace, "customers")),
+          s"Expected dependencies on both source tables, got $depParts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $secondSource")
+      }
+    }
+  }
+
+  test("dependency extraction: SQL source subquery deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val subquerySql =
+        s"SELECT * FROM $fullSourceTableName " +
+          s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(subquerySql),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("dependency extraction: SQL source self-join deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val selfJoinSql =
+        s"SELECT a.region AS a_region, a.count AS a_count " +
+          s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+          s"ON a.region = b.region"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(selfJoinSql),
+        where = None,
+        select = Seq(
+          Column("region", DimensionExpression("a_region"), 0),
+          Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency for self-join, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric 
view") {
+    withTestCatalogTables {
+      val first = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, first)
+      val firstYaml = capturedViewInfo().queryText()
+
+      // Replace with a new body (different WHERE clause).
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 100"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE OR REPLACE VIEW $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      val finalInfo = capturedViewInfo()
+      assert(finalInfo.queryText() === replacementYaml,
+        "OR REPLACE should swap the captured ViewInfo's queryText.")
+      assert(finalInfo.queryText() !== firstYaml,
+        "OR REPLACE should not leave the original captured queryText in 
place.")
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view 
exists") {
+    withTestCatalogTables {
+      val original = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, original)
+      val originalYaml = capturedViewInfo().queryText()
+
+      // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog 
should not see
+      // the second create at all (V2ViewPreparation's `viewExists` 
short-circuit fires before
+      // `buildViewInfo`), so the captured ViewInfo retains the original body.
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 999"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      assert(capturedViewInfo().queryText() === originalYaml,
+        "IF NOT EXISTS over an existing metric view should be a no-op.")
+    }
+  }
+
+  test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " +
+      "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") {
+    withTestCatalogTables {
+      // Pre-create a regular v2 table at the same ident the metric view will 
target. The
+      // catalog's `createView` call below should raise 
`ViewAlreadyExistsException`, which
+      // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the 
precise cross-type
+      // collision error that `CreateV2ViewExec` emits.
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      val ex = intercept[AnalysisException] {
+        sql(
+          s"""CREATE VIEW $fullMetricViewName
+             |WITH METRICS
+             |LANGUAGE YAML
+             |AS
+             |$$$$
+             |$yaml
+             |$$$$""".stripMargin)
+      }
+      // CreateV2ViewExec / CreateV2MetricViewExec route this through
+      // `unsupportedCreateOrReplaceViewOnTableError` which maps to
+      // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE`.
+      assert(ex.getCondition === "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE",
+        s"Expected EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE, got 
${ex.getCondition}: ${ex.getMessage}")
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table 
sits at the " +
+      "ident") {
+    withTestCatalogTables {
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      // IF NOT EXISTS over a table is a no-op (v1 parity), not an error.
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident),
+        "IF NOT EXISTS over a v2 table should not register a view in the 
catalog.")
+    }
+  }
+
+  test("dependency extraction: V1 session-catalog source emits 3-part 
nameParts") {
+    val v1Source = "metric_view_v2_v1source"
+    spark.range(0, 5).toDF("v")
+      .write.mode("overwrite").saveAsTable(v1Source)
+    try {
+      withTestCatalogTables {
+        val mv = MetricView(
+          "0.1",
+          // SQL source resolves through the current (session) catalog; the 
resolved
+          // `LogicalRelation` carries a session-catalog `CatalogTable`.
+          SQLSource(s"SELECT v AS region, v AS count FROM $v1Source"),
+          where = None,
+          select = metricViewColumns)
+        createMetricView(fullMetricViewName, mv)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null && deps.dependencies().length === 1)
+        val parts = 
deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq
+        // For a session-catalog source, `TableIdentifier.nameParts` includes 
catalog + db +
+        // table when the catalog is set; here we expect at least 2 parts 
(`db.table`) and
+        // up to 3 (`spark_catalog.db.table`) -- both are valid producer 
outputs depending
+        // on whether the analyzer captured the session-catalog component.
+        assert(parts.last === v1Source, s"Last part should be the table name, 
got $parts")
+        assert(parts.length >= 2 && parts.length <= 3,
+          s"V1 nameParts arity should be 2 or 3, got ${parts.length}: $parts")
+      }
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $v1Source")
+    }
+  }
+
+  test("dependency extraction: multi-level V2 namespace source emits N+2 
nameParts") {
+    val multiNamespace = Array("ns_a", "ns_b")
+    val multiTable = "events_deep"
+    val multiFull = 
s"$testCatalogName.${multiNamespace.mkString(".")}.$multiTable"
+    withTestCatalogTables {
+      // The InMemoryTableCatalog (RelationCatalog mixin) supports multi-level 
namespaces.
+      sql(s"CREATE NAMESPACE IF NOT EXISTS 
$testCatalogName.${multiNamespace.head}")
+      sql(s"CREATE NAMESPACE IF NOT EXISTS " +
+        s"$testCatalogName.${multiNamespace.mkString(".")}")
+      sql(s"CREATE TABLE $multiFull (region STRING, count INT) USING foo")
+      try {
+        val mv = MetricView(
+          "0.1",
+          SQLSource(s"SELECT region, count FROM $multiFull"),
+          where = None,
+          select = metricViewColumns)
+        createMetricView(fullMetricViewName, mv)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null && deps.dependencies().length === 1)
+        val parts = 
deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq
+        assert(parts === Seq(testCatalogName, multiNamespace(0), 
multiNamespace(1), multiTable),
+          s"Multi-level nameParts should preserve every namespace component, 
got $parts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $multiFull")
+        sql(s"DROP NAMESPACE IF EXISTS " +
+          s"$testCatalogName.${multiNamespace.mkString(".")} CASCADE")
+        sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${multiNamespace.head} 
CASCADE")
+      }
+    }
+  }
+
+  test("DESCRIBE TABLE EXTENDED on a v2 metric view round-trips through 
loadRelation") {

Review Comment:
   Let's also test that the normal describe can return correct result for 
metric views



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataOnlyTable, RelationCatalog, Table, TableCatalog, 
TableDependency, TableSummary, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[RelationCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  private val testCatalogName = "testcat"
+  private val testNamespace = "ns"
+  private val sourceTableName = "events"
+  private val fullSourceTableName =
+    s"$testCatalogName.$testNamespace.$sourceTableName"
+  private val metricViewName = "mv"
+  private val fullMetricViewName =
+    s"$testCatalogName.$testNamespace.$metricViewName"
+
+  private val metricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+  private val testTableData = Seq(
+    ("region_1", 1, 5.0),
+    ("region_2", 2, 10.0))
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      s"spark.sql.catalog.$testCatalogName",
+      classOf[MetricViewRecordingCatalog].getName)
+    // A catalog that does not implement ViewCatalog - used for the negative 
gate test.
+    spark.conf.set(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+      classOf[InMemoryTableCatalog].getName)
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+    spark.conf.unset(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+    super.afterAll()
+  }
+
+  private def withTestCatalogTables(body: => Unit): Unit = {
+    MetricViewRecordingCatalog.reset()
+    testTableData.toDF("region", "count", "price")
+      .createOrReplaceTempView("metric_view_v2_source")
+    try {
+      sql(
+        s"""CREATE TABLE $fullSourceTableName
+           |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+      body
+    } finally {
+      sql(s"DROP VIEW IF EXISTS $fullMetricViewName")
+      sql(s"DROP TABLE IF EXISTS $fullSourceTableName")
+      spark.catalog.dropTempView("metric_view_v2_source")
+      MetricViewRecordingCatalog.reset()
+    }
+  }
+
+  private def createMetricView(
+      name: String,
+      metricView: MetricView,
+      comment: Option[String] = None): String = {
+    val yaml = MetricViewFactory.toYAML(metricView)
+    val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+    sql(
+      s"""CREATE VIEW $name
+         |WITH METRICS$commentClause
+         |LANGUAGE YAML
+         |AS
+         |$$$$
+         |$yaml
+         |$$$$""".stripMargin)
+    yaml
+  }
+
+  private def capturedViewInfo(): ViewInfo = {
+    val ident = Identifier.of(Array(testNamespace), metricViewName)
+    val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+    assert(info != null,
+      s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+    info
+  }
+
+  test("V2 catalog receives METRIC_VIEW table type and view text via 
ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s 
constructor stamps it
+      // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the 
round-tripped row
+      // back to `CatalogTableType.METRIC_VIEW`.
+      assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(info.queryText() === yaml)
+
+      val deps = info.viewDependencies()
+      assert(deps != null)
+      assert(deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("V2 catalog path populates metric_view.* + view context + sql configs 
on ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+
+      // metric_view.* descriptive properties (mirrors DBR 
SingleSourceMetricView).
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+      assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+      assert(props.get(MetricView.PROP_FROM_SQL) === null)
+      assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+      // SQL configs and current catalog/namespace are first-class typed 
fields on ViewInfo, no
+      // longer encoded into properties for V2 catalogs.
+      assert(info.sqlConfigs().size > 0,
+        s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+      assert(info.currentCatalog() ===
+        spark.sessionState.catalogManager.currentCatalog.name())
+      assert(info.currentNamespace().toSeq ===
+        spark.sessionState.catalogManager.currentNamespace.toSeq)
+    }
+  }
+
+  test("DROP VIEW succeeds on a V2 metric view") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+
+      assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+
+      sql(s"DROP VIEW $fullMetricViewName")
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident))
+    }
+  }
+
+  test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") {
+    withTestCatalogTables {
+      sql(s"DROP VIEW IF EXISTS 
$testCatalogName.$testNamespace.does_not_exist")
+    }
+  }
+
+  test("V2 catalog path captures SQL source and comment") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(s"SELECT * FROM $fullSourceTableName"),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+      assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+      assert(props.get(MetricView.PROP_FROM_NAME) === null)
+      assert(props.get(MetricView.PROP_FROM_SQL) ===
+        s"SELECT * FROM $fullSourceTableName")
+      assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+      val deps = info.viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1)
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("metric view columns carry metric_view.type / metric_view.expr in 
column metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val cols = capturedViewInfo().columns()
+      assert(cols.length === metricViewColumns.length)
+
+      val byName = cols.map(c => c.name() -> c).toMap
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regionMeta = metadataOf("region")
+      assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+
+      val countMeta = metadataOf("count_sum")
+      assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"measure")
+      assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+    }
+  }
+
+  test("user-specified column names with comments preserve metric_view.* 
metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(metricView)
+      // Give both columns new names, and a comment on each. Without the 
`retainMetadata`
+      // fix to `ViewHelper.aliasPlan`, the metric_view.* keys disappear here.
+      sql(
+        s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n 
COMMENT 'count')
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+
+      val cols = capturedViewInfo().columns()
+      val byName = cols.map(c => c.name() -> c).toMap
+      assert(byName.keySet === Set("reg", "n"))
+
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regMeta = metadataOf("reg")
+      assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+      // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into 
`Column.comment()`
+      // rather than leaving it inside `metadataInJSON`; assert via the V2 
column accessor.
+      assert(byName("reg").comment() === "region alias")
+
+      val nMeta = metadataOf("n")
+      assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+      assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+      assert(byName("n").comment() === "count")
+    }
+  }
+
+  test("dependency extraction: SQL source JOIN captures both tables") {
+    withTestCatalogTables {
+      val secondSource = s"$testCatalogName.$testNamespace.customers"
+      sql(
+        s"""CREATE TABLE $secondSource (id INT, name STRING)
+           |USING foo""".stripMargin)
+      try {
+        val joinSql =
+          s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+            s"JOIN $secondSource c ON t.count = c.id"
+        val metricView = MetricView(
+          "0.1",
+          SQLSource(joinSql),
+          where = None,
+          select = Seq(
+            Column("name", DimensionExpression("name"), 0),
+            Column("count_sum", MeasureExpression("sum(count)"), 1)))
+        createMetricView(fullMetricViewName, metricView)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null)
+        val depParts =
+          
deps.dependencies().map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet
+        assert(depParts === Set(
+          Seq(testCatalogName, testNamespace, sourceTableName),
+          Seq(testCatalogName, testNamespace, "customers")),
+          s"Expected dependencies on both source tables, got $depParts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $secondSource")
+      }
+    }
+  }
+
+  test("dependency extraction: SQL source subquery deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val subquerySql =
+        s"SELECT * FROM $fullSourceTableName " +
+          s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(subquerySql),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("dependency extraction: SQL source self-join deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val selfJoinSql =
+        s"SELECT a.region AS a_region, a.count AS a_count " +
+          s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+          s"ON a.region = b.region"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(selfJoinSql),
+        where = None,
+        select = Seq(
+          Column("region", DimensionExpression("a_region"), 0),
+          Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().length === 1,
+        s"Expected 1 deduplicated dependency for self-join, got " +
+          s"${Option(deps).map(_.dependencies().length).getOrElse(0)}")
+      val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric 
view") {
+    withTestCatalogTables {
+      val first = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, first)
+      val firstYaml = capturedViewInfo().queryText()
+
+      // Replace with a new body (different WHERE clause).
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 100"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE OR REPLACE VIEW $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      val finalInfo = capturedViewInfo()
+      assert(finalInfo.queryText() === replacementYaml,
+        "OR REPLACE should swap the captured ViewInfo's queryText.")
+      assert(finalInfo.queryText() !== firstYaml,
+        "OR REPLACE should not leave the original captured queryText in 
place.")

Review Comment:
   This is kind of unnecessary. I would rather you assert on the other fields 
of the final info to match with the metric view we want to replace with 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to