longvu-db commented on code in PR #55540:
URL: https://github.com/apache/spark/pull/55540#discussion_r3174490383
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2976,6 +2997,519 @@ class DataSourceV2DataFrameSuite
}
}
+ // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+ // Each test creates a DSv2 table with initial data, builds a temp view with
a filter
+ // (to demonstrate that the stored plan is non-trivial), and then verifies
the view
+ // behavior after various table modifications (session or external).
+
+ // Scenario 1.1 (session write)
+ test("temp view with stored plan reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 (external write)
+ test("temp view with stored plan reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ val schema = StructType.fromDDL("id INT, salary INT")
+ val extTable = catalog("testcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 connector w/ cache (external write, caching connector)
+ test("connector w/ cache: temp view stale after external write") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via catalog API (bypasses cache)
+ val schema = StructType.fromDDL("id INT, salary INT")
+ val extTable = catalog("cachingcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))
+
+ // Caching connector returns stale table: external write invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write becomes
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.1 (session ADD COLUMN)
+ test("temp view with stored plan preserves schema after session ADD COLUMN")
{
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+ sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 (external ADD COLUMN)
+ test("temp view with stored plan preserves schema after external ADD
COLUMN") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds data with new schema
+ val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
+ val extTable = catalog("testcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+ test("connector w/ cache: temp view stale after external ADD COLUMN") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change + data via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("cachingcat").alterTable(ident, addCol)
+
+ val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
+ val extTable = catalog("cachingcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))
+
+ // Caching connector returns stale table: external changes invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 3.2 (external column removal)
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]