AnishMahto commented on code in PR #52119:
URL: https://github.com/apache/spark/pull/52119#discussion_r2299913607


##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala:
##########
@@ -434,6 +434,34 @@ class PythonPipelineSuite
         .map(_.identifier) == Seq(graphIdentifier("a"), 
graphIdentifier("something")))
   }
 
+  test("MV/ST with partition columns works") {
+    val graph = buildGraph("""
+           |from pyspark.sql.functions import col
+           |
+           |@dp.materialized_view(partition_cols = ["id_mod"])
+           |def mv():
+           |  return spark.range(5).withColumn("id_mod", col("id") % 2)
+           |
+           |@dp.table(partition_cols = ["id_mod"])
+           |def st():
+           |  return spark.readStream.table("mv")
+           |""".stripMargin)
+
+    val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ 
=> ())
+    updateContext.pipelineExecution.runPipeline()
+    updateContext.pipelineExecution.awaitCompletion()

Review Comment:
   nit: use `PipelineTest.startPipelineAndWaitForCompletion`



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala:
##########
@@ -434,6 +434,34 @@ class PythonPipelineSuite
         .map(_.identifier) == Seq(graphIdentifier("a"), 
graphIdentifier("something")))
   }
 
+  test("MV/ST with partition columns works") {
+    val graph = buildGraph("""
+           |from pyspark.sql.functions import col
+           |
+           |@dp.materialized_view(partition_cols = ["id_mod"])
+           |def mv():
+           |  return spark.range(5).withColumn("id_mod", col("id") % 2)
+           |
+           |@dp.table(partition_cols = ["id_mod"])
+           |def st():
+           |  return spark.readStream.table("mv")
+           |""".stripMargin)
+
+    val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ 
=> ())
+    updateContext.pipelineExecution.runPipeline()
+    updateContext.pipelineExecution.awaitCompletion()
+
+    // check table is created with correct partitioning
+    Seq("mv", "st").foreach { tableName =>
+      val table = 
spark.sessionState.catalog.getTableMetadata(graphIdentifier(tableName))
+      assert(table.partitionColumnNames == Seq("id_mod"))
+
+      val rows = spark.table(tableName).collect().map(r => (r.getLong(0), 
r.getLong(1))).toSet
+      val expected = (0 until 5).map(id => (id.toLong, (id % 2).toLong)).toSet
+      assert(rows == expected)

Review Comment:
   nit: use `checkAnswer`



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala:
##########
@@ -266,6 +266,45 @@ class SqlPipelineSuite extends PipelineTest with 
SharedSparkSession {
     )
   }
 
+  test("MV/ST with partition columns works") {
+    val unresolvedDataflowGraph = unresolvedDataflowGraphFromSql(
+      sqlText = """
+                  |CREATE MATERIALIZED VIEW mv
+                  |PARTITIONED BY (id_mod)
+                  |AS
+                  |SELECT
+                  |  id,
+                  |  id % 2 AS id_mod
+                  |FROM range(3);
+                  |
+                  |CREATE STREAMING TABLE st
+                  |PARTITIONED BY (id_mod)
+                  |AS
+                  |SELECT * FROM STREAM(mv);
+                  |""".stripMargin
+    )
+    startPipelineAndWaitForCompletion(unresolvedDataflowGraph)
+    val expected = Seq(
+      Row(0, 0),
+      Row(1, 1),
+      Row(2, 0)
+    )
+
+    Seq("mv", "st").foreach { tableName =>
+      // check table partition columns
+      val tableMeta = spark.sessionState.catalog.getTableMetadata(
+        fullyQualifiedIdentifier(tableName)
+      )
+      assert(tableMeta.partitionColumnNames == Seq("id_mod"))

Review Comment:
   Same nit as above, use DSv2 if possible



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala:
##########
@@ -434,6 +434,34 @@ class PythonPipelineSuite
         .map(_.identifier) == Seq(graphIdentifier("a"), 
graphIdentifier("something")))
   }
 
+  test("MV/ST with partition columns works") {
+    val graph = buildGraph("""
+           |from pyspark.sql.functions import col
+           |
+           |@dp.materialized_view(partition_cols = ["id_mod"])
+           |def mv():
+           |  return spark.range(5).withColumn("id_mod", col("id") % 2)
+           |
+           |@dp.table(partition_cols = ["id_mod"])
+           |def st():
+           |  return spark.readStream.table("mv")
+           |""".stripMargin)
+
+    val updateContext = new PipelineUpdateContextImpl(graph, eventCallback = _ 
=> ())
+    updateContext.pipelineExecution.runPipeline()
+    updateContext.pipelineExecution.awaitCompletion()
+
+    // check table is created with correct partitioning
+    Seq("mv", "st").foreach { tableName =>
+      val table = 
spark.sessionState.catalog.getTableMetadata(graphIdentifier(tableName))

Review Comment:
   nit: lets use DSv2 API, i.e `spark.sessionState.catalogManager`



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala:
##########
@@ -434,6 +434,34 @@ class PythonPipelineSuite
         .map(_.identifier) == Seq(graphIdentifier("a"), 
graphIdentifier("something")))
   }
 
+  test("MV/ST with partition columns works") {

Review Comment:
   Should we also add a test for what happens when a user attempts to change 
partition columns between pipeline runs?
   
   I'd expect that to either trigger a full refresh or throw an exception. 
Either are probably acceptable, but it'd be nice for this behavior to be 
tested. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to