This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dde895c722af [SPARK-53651][SDP] Add support for persistent views in
pipelines
dde895c722af is described below
commit dde895c722af503d57ab235907a00373a7935178
Author: Sandy Ryza <[email protected]>
AuthorDate: Tue Sep 23 10:59:59 2025 +0800
[SPARK-53651][SDP] Add support for persistent views in pipelines
### What changes were proposed in this pull request?
Adds support for defining non-temporary views within Declarative Pipelines
definitions.
E.g.
```sql
CREATE VIEW my_view AS SELECT * FROM range(5)
```
When the pipeline is executed, the view will be updated or created if they
don't already exist.
This will only work in SQL definitions files, because views can't generally
be defined via PySpark DataFrame APIs.
### Why are the changes needed?
Closes a gap in Declarative Pipelines support for generating common catalog
objects.
### Does this PR introduce _any_ user-facing change?
Yes, only additive.
### How was this patch tested?
Tests for:
- SQL syntax
- Failures
- Views that read from other objects that can be defined in pipelines
- Other objects defined in pipelines reading from views
- Views reading from other views
### Was this patch authored or co-authored using generative AI tooling?
Closes #52398 from sryza/persisted-views.
Authored-by: Sandy Ryza <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/connect/pipelines/PipelinesHandler.scala | 3 +-
.../spark/sql/pipelines/graph/DataflowGraph.scala | 4 +
.../pipelines/graph/DataflowGraphTransformer.scala | 2 +-
.../spark/sql/pipelines/graph/DatasetManager.scala | 111 ++++++-
.../sql/pipelines/graph/GraphValidations.scala | 8 +-
.../graph/SqlGraphRegistrationContext.scala | 6 +-
.../spark/sql/pipelines/graph/ViewHelpers.scala | 4 +-
.../spark/sql/pipelines/graph/elements.scala | 13 +-
.../sql/pipelines/graph/SqlPipelineSuite.scala | 17 +
.../spark/sql/pipelines/graph/ViewSuite.scala | 364 +++++++++++++++++++++
.../utils/TestGraphRegistrationContext.scala | 9 +-
11 files changed, 520 insertions(+), 21 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index f35aee6a16eb..f01b9cfb8f09 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -183,7 +183,8 @@ private[connect] object PipelinesHandler extends Logging {
objectType = Option(QueryOriginType.View.toString),
objectName = Option(viewIdentifier.unquotedString),
language = Option(Python())),
- properties = Map.empty))
+ properties = Map.empty,
+ sqlText = None))
case _ =>
throw new IllegalArgumentException(s"Unknown dataset type:
${dataset.getDatasetType}")
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
index c49f151c5da1..2b6db5e5dd42 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala
@@ -115,6 +115,10 @@ case class DataflowGraph(flows: Seq[Flow], tables:
Seq[Table], views: Seq[View])
}.toMap
}
+ /** The [[Flow]]s that write to a given destination. */
+ lazy val resolvedFlowsTo: Map[TableIdentifier, Seq[ResolvedFlow]] =
+ resolvedFlows.groupBy(_.destinationIdentifier)
+
lazy val resolutionFailedFlows: Seq[ResolutionFailedFlow] = {
flows.collect { case f: ResolutionFailedFlow => f }
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
index 8448ed5f10d2..23914a55f31e 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala
@@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils
* Assumptions:
* 1. Each output will have at-least 1 flow to it.
* 2. Each flow may or may not have a destination table. If a flow does not
have a destination
- * table, the destination is a temporary view.
+ * table, the destination is a view.
*
* The way graph is structured is that flows, tables and sinks all are graph
elements or nodes.
* While we expose transformation functions for each of these entities, we
also expose a way to
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
index 05e12a3f7859..cb142988ce51 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
@@ -24,6 +24,8 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.PersistedView
+import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{
CatalogV2Util,
Identifier,
@@ -33,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.{
}
import
org.apache.spark.sql.connector.catalog.CatalogV2Util.v2ColumnsToStructType
import org.apache.spark.sql.connector.expressions.Expressions
+import org.apache.spark.sql.execution.command.CreateViewCommand
import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
import org.apache.spark.sql.pipelines.util.SchemaInferenceUtils.diffSchemas
import org.apache.spark.sql.pipelines.util.SchemaMergingUtils
@@ -116,16 +119,122 @@ object DatasetManager extends Logging {
table
}
}
- // TODO: Publish persisted views to the metastore.
+
}
.getDataflowGraph
} catch {
case e: SparkException if e.getCause != null => throw e.getCause
}
+ materializeViews(materializedGraph, context)
materializedGraph
}
+ /**
+ * Publish or refresh all the [[PersistedView]]s in the specified
[[DataflowGraph]]
+ *
+ * @param virtualizedConnectedGraphWithTables virtualizedConnectedGraph that
has table information
+ * from the graph.
+ */
+ private def materializeViews(
+ virtualizedConnectedGraphWithTables: DataflowGraph,
+ context: PipelineUpdateContext): Unit = {
+ var viewsToPublish: Set[PersistedView] =
+ virtualizedConnectedGraphWithTables.persistedViews.toSet
+ var publishedViews: Set[TableIdentifier] = Set.empty
+ var failedViews: Set[TableIdentifier] = Set.empty
+
+ // To publish a view, it is required that all the input sources must
exists in the metastore.
+ // Thereby, if a Persisted View target reads another Persisted View
source, the source must be
+ // published first.
+ // Here we make sure all the persisted views are published in correct
order
+ val persistedViewIdentifiers =
+
virtualizedConnectedGraphWithTables.persistedViews.map(_.identifier).toSet
+ val viewToFlowMap =
+ ViewHelpers.persistedViewIdentifierToFlow(graph =
virtualizedConnectedGraphWithTables)
+ val materializationDependencies =
+ virtualizedConnectedGraphWithTables.persistedViews.map { v =>
+ val flow = viewToFlowMap(v.identifier)
+ val inputs = flow.inputs.intersect(persistedViewIdentifiers)
+ (v.identifier, inputs)
+ }.toMap
+
+ // As long as all views are not materialized, we try to materialize them
+ while (viewsToPublish.nonEmpty) {
+ // Mark any views with failed inputs as skipped
+ viewsToPublish
+ .filter { v =>
+ materializationDependencies(v.identifier)
+ .exists(failedViews.contains)
+ }
+ .foreach { v =>
+ val flowToView = viewToFlowMap(v.identifier)
+ context.flowProgressEventLogger.recordSkipped(flowToView)
+
+ failedViews += v.identifier
+ viewsToPublish -= v
+ }
+
+ // Persist any views without pending inputs
+ viewsToPublish
+ .filter { v =>
+ val pendingInputs =
+ materializationDependencies(v.identifier).diff(publishedViews)
+
+ pendingInputs.isEmpty
+ }
+ .foreach { v =>
+ val flowToView = viewToFlowMap(v.identifier)
+ try {
+ materializeView(v, flowToView, context.spark)
+ publishedViews += v.identifier
+ viewsToPublish -= v
+ } catch {
+ case NonFatal(ex) =>
+ context.flowProgressEventLogger.recordFailed(
+ flowToView,
+ ex,
+ logAsWarn = false
+ )
+ failedViews += v.identifier
+ viewsToPublish -= v
+ }
+ }
+ }
+ }
+
+ private def materializeView(view: View, flow: ResolvedFlow, spark:
SparkSession): Unit = {
+ val command = CreateViewCommand(
+ name = view.identifier,
+ userSpecifiedColumns = Nil,
+ viewType = PersistedView,
+ comment = view.comment,
+ collation = None,
+ properties = view.properties,
+ originalText = view.sqlText,
+ plan = flow.df.logicalPlan,
+ allowExisting = true,
+ replace = true,
+ isAnalyzed = true
+ )
+
+ val queryContext = flow.queryContext
+
+ val catalogManager = spark.sessionState.catalogManager
+ val currentCatalogName = catalogManager.currentCatalog.name()
+ val currentNamespace = catalogManager.currentNamespace
+ try {
+ // Using the catalog and database from the flow ensures that reads
within the view are
+ // directed to the right catalog/database.
+ catalogManager.setCurrentCatalog(queryContext.currentCatalog.get)
+ queryContext.currentDatabase.foreach { d =>
catalogManager.setCurrentNamespace(Array(d))}
+ command.run(spark)
+ } finally {
+ catalogManager.setCurrentCatalog(currentCatalogName)
+ catalogManager.setCurrentNamespace(currentNamespace)
+ }
+ }
+
/**
* Materializes a table in the catalog. This method will create or update
the table in the
* catalog based on the given table and context.
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
index 18aad5fe07cb..824c15dc8791 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
@@ -331,10 +331,12 @@ trait GraphValidations extends Logging {
throw new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
- "persistedViewName" -> persistedView.identifier.toString,
- "temporaryViewName" -> tempView.identifier.toString
+ "objName" -> persistedView.identifier.toString,
+ "obj" -> "view",
+ "tempObjName" -> tempView.identifier.toString,
+ "tempObj" -> "temporary view"
),
- cause = null
+ cause = None
)
case _ =>
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
index 113fd1652cd7..eb6b20b8a4ef 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
@@ -322,7 +322,8 @@ class SqlGraphRegistrationContext(
objectName = Option(viewIdentifier.unquotedString),
objectType = Option(QueryOriginType.View.toString)
),
- properties = cv.properties
+ properties = cv.properties,
+ sqlText = cv.originalText
)
)
@@ -365,7 +366,8 @@ class SqlGraphRegistrationContext(
objectName = Option(viewIdentifier.unquotedString),
objectType = Option(QueryOriginType.View.toString)
),
- properties = Map.empty
+ properties = Map.empty,
+ sqlText = cvc.originalText
)
)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/ViewHelpers.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/ViewHelpers.scala
index 9f05219c383d..9edd0361cb0f 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/ViewHelpers.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/ViewHelpers.scala
@@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
object ViewHelpers {
/** Map of view identifier to corresponding unresolved flow */
- def persistedViewIdentifierToFlow(graph: DataflowGraph):
Map[TableIdentifier, Flow] = {
+ def persistedViewIdentifierToFlow(graph: DataflowGraph):
Map[TableIdentifier, ResolvedFlow] = {
graph.persistedViews.map { v =>
require(
graph.flowsTo.get(v.identifier).isDefined,
s"No flows to view ${v.identifier} were found"
)
- val flowsToView = graph.flowsTo(v.identifier)
+ val flowsToView = graph.resolvedFlowsTo(v.identifier)
require(
flowsToView.size == 1,
s"Expected a single flow to the view, found ${flowsToView.size} flows
to ${v.identifier}"
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
index b58f6e2297fb..ee78f96d5316 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
@@ -225,34 +225,31 @@ trait View extends GraphElement {
/** Properties of this view */
val properties: Map[String, String]
+ /** (SQL-specific) The raw query that defines the [[View]]. */
+ val sqlText: Option[String]
+
/** User-specified comment that can be placed on the [[View]]. */
val comment: Option[String]
}
/**
* Representing a temporary [[View]] in a [[DataflowGraph]].
- *
- * @param identifier The identifier of this view within the graph.
- * @param properties Properties of the view
- * @param comment when defining a view
*/
case class TemporaryView(
identifier: TableIdentifier,
properties: Map[String, String],
+ sqlText: Option[String],
comment: Option[String],
origin: QueryOrigin
) extends View {}
/**
* Representing a persisted [[View]] in a [[DataflowGraph]].
- *
- * @param identifier The identifier of this view within the graph.
- * @param properties Properties of the view
- * @param comment when defining a view
*/
case class PersistedView(
identifier: TableIdentifier,
properties: Map[String, String],
+ sqlText: Option[String],
comment: Option[String],
origin: QueryOrigin
) extends View {}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala
index 755411cf9048..e921a6bfe2ab 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala
@@ -614,6 +614,23 @@ class SqlPipelineSuite extends PipelineTest with
SharedSparkSession {
assert(ex.errorClass.contains("TEMP_VIEW_NAME_TOO_MANY_NAME_PARTS"))
}
+ test("create view syntax for persisted views") {
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"CREATE VIEW b COMMENT 'my persisted comment' AS SELECT *
FROM range(1, 4);"
+ )
+ val graph = unresolvedDataflowGraph.resolve().validate()
+
+ val view = graph.views.last
+
+ // view identifier should be multipart for persisted views
+ assert(view.identifier == fullyQualifiedIdentifier("b"))
+ assert(view.isInstanceOf[PersistedView])
+ assert(
+ view.sqlText.isDefined && view.sqlText.get == "SELECT * FROM range(1, 4)"
+ )
+ assert(view.comment.get == "my persisted comment")
+ }
+
test("Use database and set catalog works") {
val pipelineCatalog = TestGraphRegistrationContext.DEFAULT_CATALOG
val pipelineDatabase = TestGraphRegistrationContext.DEFAULT_DATABASE
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ViewSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ViewSuite.scala
new file mode 100644
index 000000000000..3b40f887fe08
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ViewSuite.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.pipelines.common.FlowStatus
+import org.apache.spark.sql.pipelines.logging.EventLevel
+import org.apache.spark.sql.pipelines.utils.ExecutionTest
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ViewSuite extends ExecutionTest with SharedSparkSession {
+ private val externalTable1Ident = fullyQualifiedIdentifier("external_t1")
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ // Create mock external tables that tests can reference, ex. to stream
from.
+ spark.sql(s"CREATE TABLE $externalTable1Ident AS SELECT * FROM RANGE(3)")
+ }
+
+ override def afterEach(): Unit = {
+ spark.sql(s"DROP TABLE IF EXISTS $externalTable1Ident")
+ super.afterEach()
+ }
+
+
+ test("create persisted views") {
+ val session = spark
+ import session.implicits._
+
+ val viewName = "mypersistedview"
+ val viewIdentifier = fullyQualifiedIdentifier(viewName)
+
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"CREATE VIEW $viewName AS SELECT * FROM range(1, 4);"
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val executedGraph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+ verifyPersistedViewMetadata(
+ graph = executedGraph,
+ viewMetadata = Map(
+ viewIdentifier -> "SELECT * FROM range(1, 4)"
+ )
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $viewIdentifier"),
+ Seq(1L, 2L, 3L).toDF()
+ )
+ }
+
+ test("persisted view reads from external table") {
+ val session = spark
+ import session.implicits._
+
+ val viewName = "mypersistedview"
+ val viewIdentifier = fullyQualifiedIdentifier(viewName)
+
+ val tableIdentifier = fullyQualifiedIdentifier("mytable")
+
+ spark.sql(s"CREATE TABLE $tableIdentifier AS SELECT * FROM range(1, 4)")
+
+ val source = tableIdentifier.toString
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"CREATE VIEW $viewName AS SELECT * FROM $source;"
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val graph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+
+ verifyPersistedViewMetadata(
+ graph = graph,
+ viewMetadata = Map(
+ viewIdentifier -> s"SELECT * FROM $source"
+ )
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $viewIdentifier"),
+ Seq(1L, 2L, 3L).toDF()
+ )
+ }
+
+ test("persisted view reads from a temporary view") {
+ val viewName = "pv"
+ val pv = fullyQualifiedIdentifier(viewName)
+
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""CREATE TEMPORARY VIEW temp_view AS SELECT * FROM range(1,
4);
+ |CREATE VIEW $viewName AS SELECT * FROM
temp_view;""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+
+ val ex = intercept[AnalysisException] {
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+ }
+
+ assertAnalysisException(
+ ex,
+ errorClass = "INVALID_TEMP_OBJ_REFERENCE",
+ metadata = Map(
+ "objName" -> pv.toString,
+ "obj" -> "view",
+ "tempObjName" -> "`temp_view`",
+ "tempObj" -> "temporary view"
+ )
+ )
+ }
+
+ test("persisted view reads from a non-existent dataset") {
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"CREATE VIEW myview AS SELECT * FROM nonexistent_view;"
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+
+ val ex = intercept[Exception] {
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+ }
+ ex match {
+ case u: UnresolvedPipelineException =>
+ assert(u.directFailures.keySet ==
Set(fullyQualifiedIdentifier("myview")))
+ case _ => fail("Unexpected error", ex)
+ }
+ }
+
+ test("persisted view reads from a streaming source") {
+ val viewName = "mypersistedview"
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""CREATE STREAMING TABLE source AS SELECT * FROM
STREAM($externalTable1Ident);
+ |CREATE VIEW $viewName AS SELECT * FROM
STREAM(source);""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+
+ val ex = intercept[AnalysisException] {
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+ }
+
+ assertAnalysisException(
+ ex,
+ errorClass =
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_PERSISTED_VIEW"
+ )
+ }
+
+ test("persisted view reads from an external streaming source") {
+ val viewName = "mypersistedview"
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"CREATE VIEW $viewName AS SELECT * FROM
STREAM($externalTable1Ident);"
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+
+ val ex = intercept[AnalysisException] {
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+ }
+
+ assertAnalysisException(
+ ex,
+ errorClass =
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_PERSISTED_VIEW"
+ )
+ }
+
+ test("persisted view reads from another persisted view") {
+ val session = spark
+ import session.implicits._
+
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""CREATE VIEW pv3 AS SELECT * FROM pv2;
+ |CREATE VIEW pv2 AS SELECT * FROM pv1;
+ |CREATE VIEW pv1 AS SELECT * FROM range(1,
4);""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val graph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+
+ verifyPersistedViewMetadata(
+ graph = graph,
+ viewMetadata = Map(
+ fullyQualifiedIdentifier("pv1") -> "SELECT * FROM range(1, 4)",
+ fullyQualifiedIdentifier("pv2") -> s"SELECT * FROM pv1",
+ fullyQualifiedIdentifier("pv3") -> s"SELECT * FROM pv2"
+ )
+ )
+
+ checkAnswer(
+ spark.sql(buildSelectQuery(fullyQualifiedIdentifier("pv3"))),
+ Seq(1L, 2L, 3L).toDF()
+ )
+ }
+
+ test("persisted view reads from a failed persisted view") {
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""
+ |CREATE VIEW pv2 AS SELECT * FROM pv1;
+ |CREATE VIEW pv1 AS SELECT 1 + 1;""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ // assert that pv1 fails
+ assertFlowProgressEvent(
+ updateContext.eventBuffer,
+ fullyQualifiedIdentifier("pv1"),
+ expectedFlowStatus = FlowStatus.FAILED,
+ expectedEventLevel = EventLevel.ERROR,
+ errorChecker =
_.getMessage.contains("CREATE_PERMANENT_VIEW_WITHOUT_ALIAS")
+ )
+
+ // assert that pv2 is skipped (as it depends on pv1)
+ assertFlowProgressEvent(
+ updateContext.eventBuffer,
+ fullyQualifiedIdentifier("pv2"),
+ expectedFlowStatus = FlowStatus.SKIPPED,
+ expectedEventLevel = EventLevel.INFO
+ )
+ }
+
+ test("persisted view reads from MV") {
+ val session = spark
+ import session.implicits._
+
+ val viewName = "mypersistedview"
+ val viewIdentifier = fullyQualifiedIdentifier(viewName)
+
+ val mvIdentifier = fullyQualifiedIdentifier("mymv")
+
+ val source = mvIdentifier.toString
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""CREATE MATERIALIZED VIEW mymv AS SELECT * FROM RANGE(1, 4);
+ |CREATE VIEW $viewName AS SELECT * FROM
$source;""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val graph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+
+ verifyPersistedViewMetadata(
+ graph = graph,
+ viewMetadata = Map(
+ viewIdentifier -> s"SELECT * FROM $source"
+ )
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $viewIdentifier"),
+ Seq(1L, 2L, 3L).toDF()
+ )
+ }
+
+ test("persisted view reads from ST") {
+ val session = spark
+ import session.implicits._
+
+ val viewName = "mypersistedview"
+ val viewIdentifier = fullyQualifiedIdentifier(viewName)
+
+ val stIdentifier = fullyQualifiedIdentifier("myst")
+
+ val source = stIdentifier.toString
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""CREATE STREAMING TABLE myst AS SELECT * FROM
STREAM($externalTable1Ident);
+ |CREATE VIEW $viewName AS SELECT * FROM
$source;""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val graph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+
+ verifyPersistedViewMetadata(
+ graph = graph,
+ viewMetadata = Map(
+ viewIdentifier -> s"SELECT * FROM $source"
+ )
+ )
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $viewIdentifier"),
+ Seq(0L, 1L, 2L).toDF()
+ )
+ }
+
+ test("mv reading from a persisted view") {
+ val session = spark
+ import session.implicits._
+
+ val viewName = "pv"
+ val viewIdentifier = fullyQualifiedIdentifier(viewName)
+
+ val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+ sqlText = s"""
+ |CREATE VIEW $viewName AS SELECT * FROM range(1, 4);
+ |CREATE MATERIALIZED VIEW myviewreader AS SELECT * FROM
$viewName;""".stripMargin
+ )
+ val updateContext = TestPipelineUpdateContext(spark,
unresolvedDataflowGraph)
+ updateContext.pipelineExecution.startPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val graph =
updateContext.pipelineExecution.graphExecution.get.graphForExecution
+
+ verifyPersistedViewMetadata(
+ graph = graph,
+ viewMetadata = Map(
+ viewIdentifier -> "SELECT * FROM range(1, 4)"
+ )
+ )
+
+ val q = buildSelectQuery(fullyQualifiedIdentifier("myviewreader"))
+
+ checkAnswer(
+ spark.sql(q),
+ Seq(1L, 2L, 3L).toDF()
+ )
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $viewIdentifier"),
+ Seq(1L, 2L, 3L).toDF()
+ )
+
+ }
+
+ private def verifyPersistedViewMetadata(
+ graph: DataflowGraph,
+ viewMetadata: Map[TableIdentifier, String]
+ ): Unit = {
+ viewMetadata.foreach {
+ case (key, value) =>
+ val viewOpt = graph.view.get(key)
+ assert(viewOpt.isDefined)
+ val view = viewOpt.get
+ assert(view.isInstanceOf[PersistedView])
+ assert(view.sqlText.isDefined && view.sqlText.get == value)
+ }
+ }
+
+ /** Builds a select query for the given dataset name. */
+ private def buildSelectQuery(identifier: TableIdentifier): String = {
+ s"SELECT * FROM $identifier"
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
index e5ac83a231e6..d0a8236734d3 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
@@ -189,7 +189,8 @@ class TestGraphRegistrationContext(
origin: QueryOrigin = QueryOrigin.empty,
viewType: ViewType = LocalTempView,
catalog: Option[String] = None,
- database: Option[String] = None
+ database: Option[String] = None,
+ sqlText: Option[String] = None
): Unit = {
val viewIdentifier = GraphIdentifierManager
@@ -202,14 +203,16 @@ class TestGraphRegistrationContext(
identifier = viewIdentifier,
comment = comment,
origin = origin,
- properties = Map.empty
+ properties = Map.empty,
+ sqlText = sqlText
)
case _ =>
PersistedView(
identifier = viewIdentifier,
comment = comment,
origin = origin,
- properties = Map.empty
+ properties = Map.empty,
+ sqlText = sqlText
)
}
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]