andreaschat-db commented on code in PR #55540:
URL: https://github.com/apache/spark/pull/55540#discussion_r3208118487
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2976,6 +2997,519 @@ class DataSourceV2DataFrameSuite
}
}
+ // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+ // Each test creates a DSv2 table with initial data, builds a temp view with
a filter
+ // (to demonstrate that the stored plan is non-trivial), and then verifies
the view
+ // behavior after various table modifications (session or external).
+
+ // Scenario 1.1 (session write)
+ test("temp view with stored plan reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 (external write)
+ test("temp view with stored plan reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ val schema = StructType.fromDDL("id INT, salary INT")
Review Comment:
Perhaps we can create a `externalAppend(catalog, ident, schema, rows)
`helper to clean things up?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -2976,6 +2997,519 @@ class DataSourceV2DataFrameSuite
}
}
+ // Temp views with stored plans: scenarios from the DSv2 table refresh tests.
+ // Each test creates a DSv2 table with initial data, builds a temp view with
a filter
+ // (to demonstrate that the stored plan is non-trivial), and then verifies
the view
+ // behavior after various table modifications (session or external).
+
+ // Scenario 1.1 (session write)
+ test("temp view with stored plan reflects session write") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 (external write)
+ test("temp view with stored plan reflects external write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ val schema = StructType.fromDDL("id INT, salary INT")
+ val extTable = catalog("testcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))
+
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 1.2 connector w/ cache (external write, caching connector)
+ test("connector w/ cache: temp view stale after external write") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via catalog API (bypasses cache)
+ val schema = StructType.fromDDL("id INT, salary INT")
+ val extTable = catalog("cachingcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))
+
+ // Caching connector returns stale table: external write invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, external write becomes
visible
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.1 (session ADD COLUMN)
+ test("temp view with stored plan preserves schema after session ADD COLUMN")
{
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+ sql(s"INSERT INTO $t VALUES (2, 200, -1)")
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 (external ADD COLUMN)
+ test("temp view with stored plan preserves schema after external ADD
COLUMN") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds data with new schema
+ val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
+ val extTable = catalog("testcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))
+
+ // view preserves original 2-column schema, filter still applied
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector)
+ test("connector w/ cache: temp view stale after external ADD COLUMN") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change + data via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("cachingcat").alterTable(ident, addCol)
+
+ val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
+ val extTable = catalog("cachingcat").loadTable(ident,
+
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))
+
+ // Caching connector returns stale table: external changes invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view preserves
original 2-column schema
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 3.2 (external column removal)
+ // Scenario 3.1 (session column removal)
+ test("temp view with stored plan detects session column removal") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // session schema change via SQL
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 3.2 (external column removal)
+ test("temp view with stored plan detects external column removal") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external schema change via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("testcat").alterTable(ident, dropCol)
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 3.2 connector w/ cache (external column removal, caching
connector)
+ test("connector w/ cache: temp view stale after external column removal") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external column removal via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("cachingcat").alterTable(ident, dropCol)
+
+ // Caching connector returns stale table: column removal invisible, no
error
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, column removal detected
+ sql(s"REFRESH TABLE $t")
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` INT has been removed"))
+ }
+ }
+
+ // Scenario 4.1 (session drop and recreate table)
+ test("temp view with stored plan resolves to session-recreated table") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ val originalTableId = catalog("testcat").loadTable(ident).id
+
+ // session drop and recreate via SQL
+ sql(s"DROP TABLE $t")
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert new data and verify the view picks it up
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+ }
+ }
+
+ // Scenario 4.2 (external drop and recreate table)
+ test("temp view with stored plan resolves to externally recreated table") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ val originalTableId = catalog("testcat").loadTable(ident).id
+
+ // external drop and recreate via catalog API
+ catalog("testcat").dropTable(ident)
+ catalog("testcat").createTable(
+ ident,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originalTableId != newTableId)
+
+ // view resolves to the new empty table
+ checkAnswer(spark.table("v"), Seq.empty)
+
+ // insert new data and verify the view picks it up
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ checkAnswer(spark.table("v"), Seq(Row(2, 200)))
+ }
+ }
+
+ // Scenario 4.2 connector w/ cache (external drop/recreate, caching
connector)
+ test("connector w/ cache: temp view stale after external drop/recreate") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and recreate via catalog API
+ catalog("cachingcat").dropTable(ident)
+ catalog("cachingcat").createTable(
+ ident,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // Caching connector returns stale table: drop/recreate invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, view resolves to new
empty table
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq.empty)
+ }
+ }
+
+ // Scenario 5.1 (session drop and re-add column with same type, multiple
views)
+ test("temp view with stored plan after session drop and re-add column same
type" +
+ " with unfiltered view") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ spark.table(t).createOrReplaceTempView("v_no_filter")
+ spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+ checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+ // drop and re-add column with same name and type
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+ sql(s"ALTER TABLE $t ADD COLUMN salary INT")
+
+ // salary values are now null, so the filtered view returns nothing
+ checkAnswer(spark.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+ // IS NULL filter now matches all rows
+ checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+
+ // Scenario 5.2 (external drop and re-add column with same type)
+ test("temp view with stored plan after external drop and re-add column same
type") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ spark.table(t).createOrReplaceTempView("v_no_filter")
+ spark.table(t).filter("salary IS
NULL").createOrReplaceTempView("v_filter_is_null")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000)))
+ checkAnswer(spark.table("v_filter_is_null"), Seq.empty)
+
+ // external drop and re-add column via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog("testcat").alterTable(ident, dropCol, addCol)
+
+ // salary values are now null, so the filtered view returns nothing
+ checkAnswer(spark.table("v"), Seq.empty)
+ // unfiltered view returns rows with null salary
+ checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null)))
+ // IS NULL filter now matches all rows
+ checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10,
null)))
+ }
+ }
+
+ // Scenario 5.2 connector w/ cache (external drop/re-add column, caching
connector)
+ test("connector w/ cache: temp view stale after external drop/re-add column
same type") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with same type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
+ catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+ // Caching connector returns stale table: column drop/re-add invisible
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, salary values are null
+ sql(s"REFRESH TABLE $t")
+ checkAnswer(spark.table("v"), Seq.empty)
+ }
+ }
+
+ // Scenario 6.1 (session drop and re-add column with different type)
+ test("temp view with stored plan detects session column type change") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // drop and re-add column with same name but different type
+ sql(s"ALTER TABLE $t DROP COLUMN salary")
+ sql(s"ALTER TABLE $t ADD COLUMN salary STRING")
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 6.2 (external drop and re-add column with different type)
+ test("temp view with stored plan detects external column type change") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with different type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog("testcat").alterTable(ident, dropCol, addCol)
+
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 6.2 connector w/ cache (external column type change, caching
connector)
+ test("connector w/ cache: temp view stale after external column type
change") {
+ val t = "cachingcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")
+
+ spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // external drop and re-add column with different type via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ val addCol = TableChange.addColumn(Array("salary"), StringType, true)
+ catalog("cachingcat").alterTable(ident, dropCol, addCol)
+
+ // Caching connector returns stale table: type change invisible, no error
+ checkAnswer(spark.table("v"), Seq(Row(1, 100)))
+
+ // REFRESH TABLE invalidates the connector cache, type change detected
+ sql(s"REFRESH TABLE $t")
+ checkError(
+ exception = intercept[AnalysisException] { spark.table("v").collect()
},
+ condition =
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
+ parameters = Map(
+ "viewName" -> "`v`",
+ "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`",
+ "colType" -> "data",
+ "errors" -> "- `salary` type has changed from INT to STRING"))
+ }
+ }
+
+ // Scenario 7 (type widening from INT to BIGINT)
Review Comment:
Shall we add here also the sub cases of external writer and external writer
with cache for consistency? Same for connect tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]