cloud-fan commented on code in PR #55487: URL: https://github.com/apache/spark/pull/55487#discussion_r3185852964
########## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Dependency.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.List; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a dependency of a SQL object such as a view or metric view. + * <p> + * A dependency is one of: {@link TableDependency} or {@link FunctionDependency}. The + * {@code sealed} declaration enforces this structurally. + * + * @since 4.2.0 + */ +@Evolving +public sealed interface Dependency permits TableDependency, FunctionDependency { Review Comment: `FunctionDependency` is in the sealed `permits` list and exposed via the `Dependency.function(...)` factory below, but no producer in this PR ever emits one -- `MetricViewHelper.collectTableDependencies` only emits `TableDependency`. Two options: (a) drop `FunctionDependency` until it has a producer (the `@Evolving` annotation is meant to evolve before stabilizing, so adding it later is cheap); (b) keep it as groundwork but mention in the PR description so reviewers don't trip on the dead surface, and add a sentence to this class-level Javadoc noting that consumers may receive only `TableDependency` instances today. ########## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableDependency.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.List; +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; + +/** + * A table dependency of a SQL object. + * <p> + * The dependent table is identified by its structural multi-part name. {@code nameParts} + * arity matches the catalog's namespace depth plus one for the table name -- for a catalog + * with single-level namespaces the parts are {@code [catalog, schema, table]}; for a catalog + * with multi-level namespaces (e.g. Iceberg with {@code db1.db2}) the parts are + * {@code [catalog, db1, db2, ..., table]}; for v1 sources resolved through the session + * catalog producers should normalize to {@code [spark_catalog, db, table]} so consumers see Review Comment: Missing comma -- as written, "session catalog producers" reads as a compound noun. ```suggestion * catalog, producers should normalize to {@code [spark_catalog, db, table]} so consumers see ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala: ########## @@ -323,6 +323,35 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateV2ViewExec(catalog.asInstanceOf[ViewCatalog], ident, userSpecifiedColumns, comment, collation, properties, sqlText, child, allowExisting, replace, viewSchemaMode) :: Nil + // CREATE VIEW ... WITH METRICS on a non-session v2 catalog. Routes the metric-view path + // through `CreateV2MetricViewExec`, which extends `V2ViewPreparation` to share the + // `IF NOT EXISTS` short-circuit, `OR REPLACE`, and cross-type-collision decoding with + // `CreateV2ViewExec`. Session-catalog dispatch stays in `CreateMetricViewCommand.run`. + case CreateMetricViewCommand( + ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, properties, + originalText, allowExisting, replace) if !CatalogV2Util.isSessionCatalog(catalog) => + val viewCatalog = catalog match { + case vc: ViewCatalog => vc + case _ => throw QueryCompilationErrors.missingCatalogViewsAbilityError(catalog) + } + // Parse + analyze the YAML body here (during planning). This mirrors the v1 path's + // late analysis in `CreateMetricViewCommand.run` -- the metric-view source plan is not + // a SQL string, so it can't ride along as a regular `query` `LogicalPlan` field on the + // logical command the way `CreateView` does. Pass the full multi-part name so v2 metric + // views with multi-level-namespace targets analyze correctly (`asTableIdentifier` would + // throw `requiresSinglePartNamespaceError` for namespace arity > 1). + val nameParts = (catalog.name() +: ident.namespace().toIndexedSeq) :+ ident.name() + val (analyzed, metricView) = MetricViewHelper.analyzeMetricViewText( + session, nameParts, originalText) + val mergedProps = properties ++ metricView.getProperties + val depParts = MetricViewHelper.collectTableDependencies(analyzed) + val deps = if (depParts.nonEmpty) { + Some(DependencyList.of( + depParts.map(parts => Dependency.table(parts: _*)): _*)) + } else None Review Comment: The empty-`depParts` case maps to `None` here, which becomes `null` on `ViewInfo.viewDependencies()`. Per the Javadoc on `ViewInfo#viewDependencies`, `null` means "no dependency list was supplied" while an empty list means "provided but the object has none." For a metric view we always *compute* deps, so when `collectTableDependencies` returns empty (e.g. `SQLSource("SELECT 1 AS x")`) the right value is `Some(DependencyList.of())`, not `None`. ```suggestion val deps = Some(DependencyList.of( depParts.map(parts => Dependency.table(parts: _*)): _*)) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala: ########## @@ -0,0 +1,1031 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.concurrent.ConcurrentHashMap + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, ViewAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataTable, Table, TableCatalog, TableDependency, TableSummary, TableViewCatalog, ViewInfo} +import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, SQLSource} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.Metadata + +/** + * Tests that exercise [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a + * non-session V2 catalog. Metric views are persisted through the same [[ViewCatalog]] interface + * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE = METRIC_VIEW` + * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording catalog used here is a + * minimal [[TableViewCatalog]] so the same instance can also host the source table referenced by + * the metric view's YAML. + */ +class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + private val testCatalogName = "testcat" + private val testNamespace = "ns" + private val sourceTableName = "events" + private val fullSourceTableName = + s"$testCatalogName.$testNamespace.$sourceTableName" + private val metricViewName = "mv" + private val fullMetricViewName = + s"$testCatalogName.$testNamespace.$metricViewName" + + private val metricViewColumns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1)) + + private val testTableData = Seq( + ("region_1", 1, 5.0), + ("region_2", 2, 10.0)) + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$testCatalogName", + classOf[MetricViewRecordingCatalog].getName) + // A catalog that does not implement ViewCatalog - used for the negative gate test. + spark.conf.set( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}", + classOf[InMemoryTableCatalog].getName) + } + + override protected def afterAll(): Unit = { + spark.conf.unset(s"spark.sql.catalog.$testCatalogName") + spark.conf.unset( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}") + super.afterAll() + } + + private def withTestCatalogTables(body: => Unit): Unit = { + MetricViewRecordingCatalog.reset() + testTableData.toDF("region", "count", "price") + .createOrReplaceTempView("metric_view_v2_source") + try { + sql( + s"""CREATE TABLE $fullSourceTableName + |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin) + body + } finally { + // The metric-view ident `mv` may have ended up as either a view (most tests) or as a + // pre-created table (a few negative tests pre-create a table at the same ident to + // exercise cross-type collisions). Sweep both kinds so subsequent tests in the suite + // start from a clean catalog state. Wrap each DROP in a Try because: + // - DROP VIEW IF EXISTS on a leftover *table* throws WRONG_COMMAND_FOR_OBJECT_TYPE + // under master's new DropViewExec active-rejection contract. + // - DROP TABLE IF EXISTS on a leftover *view* throws the symmetric error. + // - On a totally clean state both are silent no-ops. + scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullSourceTableName")) + spark.catalog.dropTempView("metric_view_v2_source") + MetricViewRecordingCatalog.reset() + } + } + + private def createMetricView( + name: String, + metricView: MetricView, + comment: Option[String] = None): String = { + val yaml = MetricViewFactory.toYAML(metricView) + val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("") + sql( + s"""CREATE VIEW $name + |WITH METRICS$commentClause + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + yaml + } + + private def capturedViewInfo(): ViewInfo = { + val ident = Identifier.of(Array(testNamespace), metricViewName) + val info = MetricViewRecordingCatalog.capturedViews.get(ident) + assert(info != null, + s"Expected ViewInfo for $ident to be captured by the V2 catalog") + info + } + + // ============================================================ + // Section 1: CREATE-related tests + // ============================================================ + + + test("V2 catalog receives METRIC_VIEW table type and view text via ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s constructor stamps it + // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the round-tripped row + // back to `CatalogTableType.METRIC_VIEW`. + assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + // The captured queryText is the raw text between `$$ ... $$` -- including the leading + // and trailing newline our SQL fixture inserts -- so trim before comparing to the + // pre-substitution YAML body. + assert(info.queryText().trim === yaml.trim) + + val deps = info.viewDependencies() + assert(deps != null) + assert(deps.dependencies().size() === 1) + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("V2 catalog path populates metric_view.* + view context + sql configs on ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 0"), + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + val props = info.properties() + + // metric_view.* descriptive properties (mirrors DBR SingleSourceMetricView). Review Comment: "DBR" is internal Databricks-Runtime terminology and shouldn't appear in OSS comments. ```suggestion // metric_view.* descriptive properties (mirrors the canonical metric-view property layout). ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala: ########## @@ -73,25 +94,110 @@ case class CreateMetricViewCommand( case class AlterMetricViewCommand(child: LogicalPlan, originalText: String) object MetricViewHelper { + + /** + * Walks the analyzed plan to collect direct table/view dependencies. Each dependency is + * returned as a structural multi-part name (`Seq[String]`); v1 sources (resolved through + * the session catalog) are normalized to a stable 3-part shape + * `[spark_catalog, db, table]` -- `TableIdentifier.nameParts` returns 1, 2, or 3 parts + * depending on whether the analyzer captured the catalog / database, so without + * normalization the same source can produce a different shape across runs. v2 sources + * already arrive fully qualified (catalog + namespace + table) and are returned as-is so + * multi-level namespaces survive. + * + * Stops recursion at relation leaf nodes and persistent `View` nodes so only direct + * (not transitive) dependencies are recorded. + */ + private[execution] def collectTableDependencies(plan: LogicalPlan): Seq[Seq[String]] = { + val tables = scala.collection.mutable.ArrayBuffer.empty[Seq[String]] + def traverse(p: LogicalPlan): Unit = p match { + case v: View if !v.isTempView => + tables += qualifyV1(v.desc.identifier.nameParts) + case r: DataSourceV2Relation if r.catalog.isDefined && r.identifier.isDefined => + val ident = r.identifier.get + // V2 catalogs may have multi-level namespaces; preserve the full arity rather than + // dot-joining the namespace into a single component. + tables += (r.catalog.get.name() +: ident.namespace().toIndexedSeq) :+ ident.name() + case r: HiveTableRelation => + tables += qualifyV1(r.tableMeta.identifier.nameParts) + case r: LogicalRelation if r.catalogTable.isDefined => + tables += qualifyV1(r.catalogTable.get.identifier.nameParts) + case other => + other.children.foreach(traverse) + other.expressions.foreach(_.foreach { + case s: SubqueryExpression => traverse(s.plan) + case _ => + }) + } + traverse(plan) + tables.distinct.toSeq + } + + /** + * Normalizes v1 source identifiers to a stable 3-part `[spark_catalog, db, table]` shape. + * `TableIdentifier.nameParts` may return 1, 2, or 3 parts depending on whether the analyzer + * captured the catalog / database components, which would otherwise leak through to + * dependency consumers as nondeterministic arity. + */ + private def qualifyV1(parts: Seq[String]): Seq[String] = parts match { + case Seq(t) => Seq(SESSION_CATALOG_NAME, SessionCatalog.DEFAULT_DATABASE, t) + case Seq(db, t) => Seq(SESSION_CATALOG_NAME, db, t) + case Seq(_, _, _) => parts + case other => other // Unexpected arity; pass through unchanged. + } + + /** + * Analyzes a metric-view YAML body so the create / alter path can capture the source plan + * and its dependencies. Returns the analyzed plan together with the parsed [[MetricView]] + * descriptor (the latter is grabbed off the un-analyzed [[MetricViewPlaceholder]] before + * the analyzer rewrites it away, so callers needing the descriptor for property emission + * don't have to re-parse the YAML). + * + * `nameParts` is the multi-part target identifier (catalog + namespace + table). The synthetic + * [[CatalogTable]] used as analysis context still carries a [[TableIdentifier]] (capped at + * 3 parts: catalog + database + table); for multi-level v2 namespaces we collapse the + * intermediate namespace components into the synthetic `database` slot. The synthetic identifier + * is not used to resolve the view body itself, so this collapse is observationally invisible to + * the analyzed plan; `verifyTemporaryObjectsNotExists` continues to receive the full + * `nameParts` so error messages still render the multi-part form. + */ def analyzeMetricViewText( session: SparkSession, - name: TableIdentifier, - viewText: String): LogicalPlan = { + nameParts: Seq[String], + viewText: String): (LogicalPlan, MetricView) = { val analyzer = session.sessionState.analyzer + val syntheticIdent = nameParts match { + case Seq(table) => + TableIdentifier(table) + case Seq(db, table) => + TableIdentifier(table, Some(db)) + case parts => + // 3+ parts: catalog is the head, table is the last, the middle (1..n-1) collapses + // into the synthetic `database` slot. We dot-join the intermediate components so a + // human inspecting the synthetic identifier can still see them. + TableIdentifier( + parts.last, + Some(parts.slice(1, parts.length - 1).mkString(".")), + Some(parts.head)) + } // this metadata is used for analysis check, it'll be replaced during create/update with // more accurate information val tableMeta = CatalogTable( - identifier = name, + identifier = syntheticIdent, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = new StructType(), viewOriginalText = Some(viewText), viewText = Some(viewText)) - val metricViewNode = MetricViewPlanner.planWrite( + val placeholder = MetricViewPlanner.planWrite( tableMeta, viewText, session.sessionState.sqlParser) - val analyzed = analyzer.executeAndCheck(metricViewNode, new QueryPlanningTracker) + // Grab the parsed descriptor BEFORE analysis -- the placeholder gets replaced by + // ResolvedMetricView during analyzer rules, after which `MetricView` is no longer + // recoverable from the plan tree. + val metricView = placeholder.desc + val analyzed = analyzer.executeAndCheck(placeholder, new QueryPlanningTracker) ViewHelper.verifyTemporaryObjectsNotExists( - isTemporary = false, name.nameParts, analyzed, Seq.empty) - analyzed + isTemporary = false, nameParts, analyzed, Seq.empty) Review Comment: `analyzeMetricViewText` runs `verifyTemporaryObjectsNotExists` but skips the cyclic-view-reference check that plain `CREATE VIEW` gets via `CheckViewReferences.checkCyclicViewReference` (only applied on `replace`). For `CREATE OR REPLACE VIEW <mv> WITH METRICS LANGUAGE YAML AS $$ from: <mv> ... $$` the cycle currently surfaces only on the next `SELECT`. Pre-existing on the v1 path too (the old `createMetricViewInSessionCatalog` didn't call it either), so this is a follow-up rather than a blocker for this PR -- but wiring it in here means both v1 and v2 metric-view CREATE paths benefit from one fix. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala: ########## @@ -0,0 +1,1031 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.concurrent.ConcurrentHashMap + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, ViewAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataTable, Table, TableCatalog, TableDependency, TableSummary, TableViewCatalog, ViewInfo} +import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, SQLSource} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.Metadata + +/** + * Tests that exercise [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a + * non-session V2 catalog. Metric views are persisted through the same [[ViewCatalog]] interface + * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE = METRIC_VIEW` + * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording catalog used here is a + * minimal [[TableViewCatalog]] so the same instance can also host the source table referenced by + * the metric view's YAML. + */ +class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + private val testCatalogName = "testcat" + private val testNamespace = "ns" + private val sourceTableName = "events" + private val fullSourceTableName = + s"$testCatalogName.$testNamespace.$sourceTableName" + private val metricViewName = "mv" + private val fullMetricViewName = + s"$testCatalogName.$testNamespace.$metricViewName" + + private val metricViewColumns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1)) + + private val testTableData = Seq( + ("region_1", 1, 5.0), + ("region_2", 2, 10.0)) + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$testCatalogName", + classOf[MetricViewRecordingCatalog].getName) + // A catalog that does not implement ViewCatalog - used for the negative gate test. + spark.conf.set( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}", + classOf[InMemoryTableCatalog].getName) + } + + override protected def afterAll(): Unit = { + spark.conf.unset(s"spark.sql.catalog.$testCatalogName") + spark.conf.unset( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}") + super.afterAll() + } + + private def withTestCatalogTables(body: => Unit): Unit = { + MetricViewRecordingCatalog.reset() + testTableData.toDF("region", "count", "price") + .createOrReplaceTempView("metric_view_v2_source") + try { + sql( + s"""CREATE TABLE $fullSourceTableName + |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin) + body + } finally { + // The metric-view ident `mv` may have ended up as either a view (most tests) or as a + // pre-created table (a few negative tests pre-create a table at the same ident to + // exercise cross-type collisions). Sweep both kinds so subsequent tests in the suite + // start from a clean catalog state. Wrap each DROP in a Try because: + // - DROP VIEW IF EXISTS on a leftover *table* throws WRONG_COMMAND_FOR_OBJECT_TYPE + // under master's new DropViewExec active-rejection contract. + // - DROP TABLE IF EXISTS on a leftover *view* throws the symmetric error. + // - On a totally clean state both are silent no-ops. + scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullSourceTableName")) + spark.catalog.dropTempView("metric_view_v2_source") + MetricViewRecordingCatalog.reset() + } + } + + private def createMetricView( + name: String, + metricView: MetricView, + comment: Option[String] = None): String = { + val yaml = MetricViewFactory.toYAML(metricView) + val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("") + sql( + s"""CREATE VIEW $name + |WITH METRICS$commentClause + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + yaml + } + + private def capturedViewInfo(): ViewInfo = { + val ident = Identifier.of(Array(testNamespace), metricViewName) + val info = MetricViewRecordingCatalog.capturedViews.get(ident) + assert(info != null, + s"Expected ViewInfo for $ident to be captured by the V2 catalog") + info + } + + // ============================================================ + // Section 1: CREATE-related tests + // ============================================================ + + + test("V2 catalog receives METRIC_VIEW table type and view text via ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s constructor stamps it + // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the round-tripped row + // back to `CatalogTableType.METRIC_VIEW`. + assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + // The captured queryText is the raw text between `$$ ... $$` -- including the leading + // and trailing newline our SQL fixture inserts -- so trim before comparing to the + // pre-substitution YAML body. + assert(info.queryText().trim === yaml.trim) + + val deps = info.viewDependencies() + assert(deps != null) + assert(deps.dependencies().size() === 1) + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("V2 catalog path populates metric_view.* + view context + sql configs on ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 0"), + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + val props = info.properties() + + // metric_view.* descriptive properties (mirrors DBR SingleSourceMetricView). + assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET") + assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName) + assert(props.get(MetricView.PROP_FROM_SQL) === null) + assert(props.get(MetricView.PROP_WHERE) === "count > 0") + + // SQL configs and current catalog/namespace are first-class typed fields on ViewInfo, no + // longer encoded into properties for V2 catalogs. + assert(info.sqlConfigs().size > 0, + s"Expected at least one captured SQL config; got ${info.sqlConfigs()}") + assert(info.currentCatalog() === + spark.sessionState.catalogManager.currentCatalog.name()) + assert(info.currentNamespace().toSeq === + spark.sessionState.catalogManager.currentNamespace.toSeq) + } + } + + test("V2 catalog path captures SQL source and comment") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + SQLSource(s"SELECT * FROM $fullSourceTableName"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView, comment = Some("my mv")) + + val info = capturedViewInfo() + val props = info.properties() + assert(props.get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL") + assert(props.get(MetricView.PROP_FROM_NAME) === null) + assert(props.get(MetricView.PROP_FROM_SQL) === + s"SELECT * FROM $fullSourceTableName") + assert(props.get(TableCatalog.PROP_COMMENT) === "my mv") + + val deps = info.viewDependencies() + assert(deps != null && deps.dependencies().size() === 1) + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("metric view columns carry metric_view.type / metric_view.expr in column metadata") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val cols = capturedViewInfo().columns() + assert(cols.length === metricViewColumns.length) + + val byName = cols.map(c => c.name() -> c).toMap + def metadataOf(name: String): Metadata = + Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}")) + + val regionMeta = metadataOf("region") + assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "dimension") + assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "region") + + val countMeta = metadataOf("count_sum") + assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure") + assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "sum(count)") + } + } + + test("user-specified column names with comments preserve metric_view.* metadata") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(metricView) + // Pins aliasPlan(retainMetadata = true): metric_view.* keys must survive a column + // rename with comments. + sql( + s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n COMMENT 'count') + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + + val cols = capturedViewInfo().columns() + val byName = cols.map(c => c.name() -> c).toMap + assert(byName.keySet === Set("reg", "n")) + + def metadataOf(name: String): Metadata = + Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}")) + + val regMeta = metadataOf("reg") + assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "dimension") + assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "region") + // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into `Column.comment()` + // rather than leaving it inside `metadataInJSON`; assert via the V2 column accessor. + assert(byName("reg").comment() === "region alias") + + val nMeta = metadataOf("n") + assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure") + assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "sum(count)") + assert(byName("n").comment() === "count") + } + } + + test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric view") { + withTestCatalogTables { + val first = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 0"), + select = metricViewColumns) + createMetricView(fullMetricViewName, first) + + // Replace with a new body (different WHERE clause). + val replacement = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 100"), + select = metricViewColumns) + val replacementYaml = MetricViewFactory.toYAML(replacement) + sql( + s"""CREATE OR REPLACE VIEW $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$replacementYaml + |$$$$""".stripMargin) + + val finalInfo = capturedViewInfo() + // Assert on the distinguishing fields of the replacement, not on diff vs. the original. + // queryText keeps the surrounding `\n` from the SQL `$$ ... $$` markers; trim first. + assert(finalInfo.queryText().trim === replacementYaml.trim) + assert(finalInfo.properties().get(MetricView.PROP_WHERE) === "count > 100") + val deps = finalInfo.viewDependencies() + assert(deps != null && deps.dependencies().size() === 1) + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view exists") { + withTestCatalogTables { + val original = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, original) + val originalYaml = capturedViewInfo().queryText() + + // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog should not see + // the second create at all (V2ViewPreparation's `viewExists` short-circuit fires before + // `buildViewInfo`), so the captured ViewInfo retains the original body. + val replacement = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 999"), + select = metricViewColumns) + val replacementYaml = MetricViewFactory.toYAML(replacement) + sql( + s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$replacementYaml + |$$$$""".stripMargin) + + assert(capturedViewInfo().queryText().trim === originalYaml.trim, + "IF NOT EXISTS over an existing metric view should be a no-op.") + } + } + + test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " + + "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") { + withTestCatalogTables { + // Pre-create a regular v2 table at the same ident the metric view will target. The + // catalog's `createView` call below should raise `ViewAlreadyExistsException`, which + // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the precise cross-type + // collision error that `CreateV2ViewExec` emits. + sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo") + + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + val ex = intercept[AnalysisException] { + sql( + s"""CREATE VIEW $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + } + // SPARK-56655 added an analyzer-time pre-check for "ident already occupied by a table" + // before the v2 view-create exec runs, so the more specific + // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE` decoded by `CreateV2MetricViewExec.run`'s catch + // block is no longer reachable when a *plain* table sits at the ident -- the analyzer + // raises `TABLE_OR_VIEW_ALREADY_EXISTS` first. Both errors carry the same actionable + // signal ("can't create a view here because something else already lives at this ident"). + assert(ex.getCondition === "TABLE_OR_VIEW_ALREADY_EXISTS", + s"Expected TABLE_OR_VIEW_ALREADY_EXISTS, got ${ex.getCondition}: ${ex.getMessage}") + } + } + + test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table sits at the " + + "ident") { + withTestCatalogTables { + sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo") + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + // IF NOT EXISTS over a table is a no-op (v1 parity), not an error. + sql( + s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + val ident = Identifier.of(Array(testNamespace), metricViewName) + assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident), + "IF NOT EXISTS over a v2 table should not register a view in the catalog.") + } + } + + test("CREATE VIEW ... WITH METRICS on a non-ViewCatalog catalog fails with " + + "MISSING_CATALOG_ABILITY.VIEWS") { + val ex = intercept[AnalysisException] { + sql( + s"""CREATE VIEW ${MetricViewV2CatalogSuite.noViewCatalogName}.default.mv + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |${MetricViewFactory.toYAML(MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns))} + |$$$$""".stripMargin) + } + // SPARK-56655 added the `.VIEWS` subclass; the bare `MISSING_CATALOG_ABILITY` no longer + // surfaces directly for the missing-view-ability case. + assert(ex.getCondition === "MISSING_CATALOG_ABILITY.VIEWS") + assert(ex.getMessage.contains("VIEWS")) + } + + test("CREATE VIEW ... WITH METRICS at a multi-level-namespace v2 target succeeds") { + val deepNamespace = Array("ns_a", "ns_b") + val deepMetricViewName = "mv_deep" + val fullDeepName = + s"$testCatalogName.${deepNamespace.mkString(".")}.$deepMetricViewName" + withTestCatalogTables { + // Pre-create the multi-level namespace + a source table inside it. The metric view + // *target* lives in the same multi-level namespace -- that's what exercises the + // `MetricViewHelper.analyzeMetricViewText` lift to multi-part nameParts. The pre-lift + // code path failed at `ident.asTableIdentifier` with `requiresSinglePartNamespaceError`. + sql(s"CREATE NAMESPACE IF NOT EXISTS $testCatalogName.${deepNamespace.head}") + sql(s"CREATE NAMESPACE IF NOT EXISTS " + + s"$testCatalogName.${deepNamespace.mkString(".")}") + try { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + sql( + s"""CREATE VIEW $fullDeepName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + + val deepIdent = Identifier.of(deepNamespace, deepMetricViewName) + val info = MetricViewRecordingCatalog.capturedViews.get(deepIdent) + assert(info != null, s"Expected ViewInfo for $deepIdent to be captured") + assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + } finally { + scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullDeepName")) + sql(s"DROP NAMESPACE IF EXISTS " + + s"$testCatalogName.${deepNamespace.mkString(".")} CASCADE") + sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${deepNamespace.head} CASCADE") + } + } + } + + // ============================================================ + // Section 2: Dependency extraction + // ============================================================ + + + test("dependency extraction: SQL source JOIN captures both tables") { + withTestCatalogTables { + val secondSource = s"$testCatalogName.$testNamespace.customers" + sql( + s"""CREATE TABLE $secondSource (id INT, name STRING) + |USING foo""".stripMargin) + try { + val joinSql = + s"SELECT c.name, t.count FROM $fullSourceTableName t " + + s"JOIN $secondSource c ON t.count = c.id" + val metricView = MetricView( + "0.1", + SQLSource(joinSql), + where = None, + select = Seq( + Column("name", DimensionExpression("name"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1))) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null) + val depParts = deps.dependencies().asScala + .map(_.asInstanceOf[TableDependency].nameParts().asScala.toSeq).toSet + assert(depParts === Set( + Seq(testCatalogName, testNamespace, sourceTableName), + Seq(testCatalogName, testNamespace, "customers")), + s"Expected dependencies on both source tables, got $depParts") + } finally { + sql(s"DROP TABLE IF EXISTS $secondSource") + } + } + } + + test("dependency extraction: SQL source subquery deduplicates same-table references") { + withTestCatalogTables { + val subquerySql = + s"SELECT * FROM $fullSourceTableName " + + s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)" + val metricView = MetricView( + "0.1", + SQLSource(subquerySql), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().size() === 1, + s"Expected 1 deduplicated dependency, got " + + s"${Option(deps).map(_.dependencies().size()).getOrElse(0)}") + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("dependency extraction: SQL source self-join deduplicates same-table references") { + withTestCatalogTables { + val selfJoinSql = + s"SELECT a.region AS a_region, a.count AS a_count " + + s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " + + s"ON a.region = b.region" + val metricView = MetricView( + "0.1", + SQLSource(selfJoinSql), + where = None, + select = Seq( + Column("region", DimensionExpression("a_region"), 0), + Column("count_sum", MeasureExpression("sum(a_count)"), 1))) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().size() === 1, + s"Expected 1 deduplicated dependency for self-join, got " + + s"${Option(deps).map(_.dependencies().size()).getOrElse(0)}") + val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().asScala.toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("dependency extraction: V1 session-catalog source emits 3-part nameParts") { + val v1Source = "metric_view_v2_v1source" + spark.range(0, 5).toDF("v") + .write.mode("overwrite").saveAsTable(v1Source) + try { + withTestCatalogTables { + val mv = MetricView( + "0.1", + // SQL source resolves through the current (session) catalog; the resolved + // `LogicalRelation` carries a session-catalog `CatalogTable`. + SQLSource(s"SELECT v AS region, v AS count FROM $v1Source"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().size() === 1) + val parts = + deps.dependencies().get(0).asInstanceOf[TableDependency].nameParts().asScala.toSeq + // `MetricViewHelper.qualifyV1` normalizes any `TableIdentifier.nameParts` shape + // (1, 2, or 3 parts depending on what the analyzer captured) to the stable + // `[spark_catalog, db, table]` shape so downstream consumers see deterministic + // arity per source kind. + assert(parts.length === 3, + s"V1 nameParts should normalize to exactly 3 parts, got ${parts.length}: $parts") + assert(parts.head === "spark_catalog", + s"V1 nameParts head should be the session-catalog name, got $parts") + assert(parts.last === v1Source, s"Last part should be the table name, got $parts") + } + } finally { + sql(s"DROP TABLE IF EXISTS $v1Source") + } + } + + test("dependency extraction: multi-level V2 namespace source emits N+2 nameParts") { + val multiNamespace = Array("ns_a", "ns_b") + val multiTable = "events_deep" + val multiFull = s"$testCatalogName.${multiNamespace.mkString(".")}.$multiTable" + withTestCatalogTables { + // The InMemoryTableCatalog (TableViewCatalog mixin) supports multi-level namespaces. + sql(s"CREATE NAMESPACE IF NOT EXISTS $testCatalogName.${multiNamespace.head}") + sql(s"CREATE NAMESPACE IF NOT EXISTS " + + s"$testCatalogName.${multiNamespace.mkString(".")}") + sql(s"CREATE TABLE $multiFull (region STRING, count INT) USING foo") + try { + val mv = MetricView( + "0.1", + SQLSource(s"SELECT region, count FROM $multiFull"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().size() === 1) + val parts = + deps.dependencies().get(0).asInstanceOf[TableDependency].nameParts().asScala.toSeq + assert(parts === Seq(testCatalogName, multiNamespace(0), multiNamespace(1), multiTable), + s"Multi-level nameParts should preserve every namespace component, got $parts") + } finally { + sql(s"DROP TABLE IF EXISTS $multiFull") + sql(s"DROP NAMESPACE IF EXISTS " + + s"$testCatalogName.${multiNamespace.mkString(".")} CASCADE") + sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${multiNamespace.head} CASCADE") + } + } + } + + // ============================================================ + // Section 3: SELECT cases + // ============================================================ + + + test("SELECT measure(...) from a v2 metric view returns aggregated rows") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + // The fixture's `events` source has rows ("region_1", 1, 5.0), ("region_2", 2, 10.0). + // The metric view aggregates by `region` summing `count`. Resolution flows through + // loadTableOrView -> MetadataTable(ViewInfo) -> V1Table.toCatalogTable(ViewInfo) -> + // CatalogTableType.METRIC_VIEW -> ResolveMetricView, which rewrites the view body + // into Aggregate(Seq(region), Seq(sum(count) AS count_sum)) over `events`. The + // `measure(...)` wrapper is required for measure columns -- selecting `count_sum` + // bare would fail (mirrors the v1 `MetricViewSuite` query syntax). + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY region"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "GROUP BY region ORDER BY region")) + } + } + + test("SELECT measure(...) with a WHERE clause on a dimension") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + // Filter at the query layer (not on the metric view's own `where:`). + checkAnswer( + sql(s"SELECT measure(count_sum) FROM $fullMetricViewName " + + "WHERE region = 'region_2'"), + sql(s"SELECT sum(count) FROM $fullSourceTableName " + + "WHERE region = 'region_2'")) + } + } + + test("SELECT against a v2 metric view honors the view's pre-defined where clause") { + withTestCatalogTables { + // Pre-define a filter on the metric view itself: only rows with count > 1 should be + // visible to consumers (i.e. region_2 only). + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 1"), + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY region"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "WHERE count > 1 GROUP BY region ORDER BY region")) + } + } + + test("SELECT from a v2 metric view supports multiple measures with different aggregations") { + withTestCatalogTables { + // Add a second measure (sum of price) so we exercise the multi-measure rewrite path. + val cols = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1), + Column("price_sum", MeasureExpression("sum(price)"), 2), + Column("price_max", MeasureExpression("max(price)"), 3)) + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = cols) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT measure(count_sum), measure(price_sum), measure(price_max) " + + s"FROM $fullMetricViewName"), + sql(s"SELECT sum(count), sum(price), max(price) FROM $fullSourceTableName")) + } + } + + test("SELECT from a v2 metric view supports ORDER BY and LIMIT on measures") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY 2 DESC LIMIT 1"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "GROUP BY region ORDER BY 2 DESC LIMIT 1")) + } + } + + // ============================================================ + // Section 4: DESCRIBE cases + // ============================================================ + + + test("DESCRIBE TABLE EXTENDED on a v2 metric view round-trips through loadTableOrView") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = createMetricView(fullMetricViewName, mv) + + // DESCRIBE TABLE EXTENDED resolves the ident through `Analyzer.lookupTableOrView`, + // which calls `TableViewCatalog.loadTableOrView` once and gets back a + // `MetadataTable(ViewInfo)`. The analyzer wraps it as a `ResolvedPersistentView` and + // `DataSourceV2Strategy` routes through SPARK-56655's `DescribeV2ViewExec`, which + // reads the typed `ViewInfo` directly and emits the standard "Type" / "View Text" / + // "View Current Catalog" / "View Schema Mode" / etc. rows. Pins `DescribeV2ViewExec` + // emits a "Type" row matching v1 `CatalogTable.toJsonLinkedHashMap` parity, so users + // can distinguish a plain VIEW from a sub-kind like METRIC_VIEW. Review Comment: Sentence is broken: "Pins `DescribeV2ViewExec` emits a \"Type\" row matching v1 `CatalogTable.toJsonLinkedHashMap` parity ..." reads as if `Pins` is a verb taking `DescribeV2ViewExec` as its direct object, and "matching X parity" is awkward. ```suggestion // "View Current Catalog" / "View Schema Mode" / etc. rows. Pins that `DescribeV2ViewExec` // emits a "Type" row for parity with v1 `CatalogTable.toJsonLinkedHashMap`, so users ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
