andreaschat-db commented on code in PR #55463:
URL: https://github.com/apache/spark/pull/55463#discussion_r3232539736
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3359,6 +3360,204 @@ class DataSourceV2DataFrameSuite
}
}
+ /** Append a single row to the table via the catalog API, bypassing SQL. */
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Incrementally constructed queries: join scenarios.
+ // df1 and df2 are analyzed at different times, then joined. The refresh
+ // phase in QueryExecution must align table versions across all references.
+
+ // Scenario 1: join after insert refreshes both sides to latest version
+ test("SPARK-54157: join refreshes both sides after insert") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external writer adds (2, 200) via direct catalog API
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ val df2 = spark.table(t)
+
+ // both sides refresh to latest version
+ checkAnswer(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+
+ // Scenario 2: join after ADD COLUMN refreshes versions but df1 keeps
original schema
+ test("SPARK-54157: join after ADD COLUMN preserves df1 schema") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external schema change via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds (2, 200, -1) with new schema
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
InternalRow(2, 200, -1))
+
+ val df2 = spark.table(t)
+
+ // df1 keeps 2-col schema, df2 has 3-col schema, both see latest data
+ checkAnswer(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+
+ // Scenario 3: join after DROP COLUMN fails with analysis exception
+ test("SPARK-54157: join after DROP COLUMN fails") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external column removal via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("testcat").alterTable(ident, dropCol)
+
+ // external writer adds (2) with 1-col schema
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT"), InternalRow(2))
+
+ val df2 = spark.table(t)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+ }
+ }
+
+ // Scenario 4: join after drop and recreate table fails with
TABLE_ID_MISMATCH
+ test("SPARK-54157: join detects table drop and recreate via table ID") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+ val originTableId = catalog("testcat").loadTable(ident).id
+
+ // external drop and recreate via catalog API
+ catalog("testcat").dropTable(ident)
+ catalog("testcat").createTable(
+ ident,
+ new TableInfo.Builder()
+ .withColumns(Array(
+ Column.create("id", IntegerType),
+ Column.create("salary", IntegerType)))
+ .build())
+
+ // external writer adds (2, 200) to the new table
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ val df2 = spark.table(t)
+ val newTableId = catalog("testcat").loadTable(ident).id
+ assert(originTableId != newTableId)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
+ matchPVals = true,
+ parameters = Map(
+ "tableName" -> ".*",
+ "capturedTableId" -> ".*",
+ "currentTableId" -> ".*"))
+ }
+ }
+
+ // Scenario 5: join after drop and re-add column with same type succeeds
+ // without column IDs, Spark cannot detect the column was replaced
+ test("SPARK-54157: join allows drop and re-add column with same type") {
Review Comment:
Shall we add a test with drop and re-add column and Column IDs supported?
Should produce an exception.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3359,6 +3360,204 @@ class DataSourceV2DataFrameSuite
}
}
+ /** Append a single row to the table via the catalog API, bypassing SQL. */
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Incrementally constructed queries: join scenarios.
+ // df1 and df2 are analyzed at different times, then joined. The refresh
+ // phase in QueryExecution must align table versions across all references.
+
+ // Scenario 1: join after insert refreshes both sides to latest version
+ test("SPARK-54157: join refreshes both sides after insert") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external writer adds (2, 200) via direct catalog API
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ val df2 = spark.table(t)
+
+ // both sides refresh to latest version
+ checkAnswer(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)))
+ }
+ }
+
+ // Scenario 2: join after ADD COLUMN refreshes versions but df1 keeps
original schema
+ test("SPARK-54157: join after ADD COLUMN preserves df1 schema") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external schema change via catalog API
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds (2, 200, -1) with new schema
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
InternalRow(2, 200, -1))
+
+ val df2 = spark.table(t)
+
+ // df1 keeps 2-col schema, df2 has 3-col schema, both see latest data
+ checkAnswer(
+ df1.join(df2, df1("id") === df2("id")),
+ Seq(Row(1, 100, 1, 100, null), Row(2, 200, 2, 200, -1)))
+ }
+ }
+
+ // Scenario 3: join after DROP COLUMN fails with analysis exception
+ test("SPARK-54157: join after DROP COLUMN fails") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ val df1 = spark.table(t)
+
+ // external column removal via catalog API
+ val dropCol = TableChange.deleteColumn(Array("salary"), false)
+ catalog("testcat").alterTable(ident, dropCol)
+
+ // external writer adds (2) with 1-col schema
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT"), InternalRow(2))
+
+ val df2 = spark.table(t)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ df1.join(df2, df1("id") === df2("id")).collect()
+ },
+ condition =
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
+ matchPVals = true,
+ parameters = Map("tableName" -> ".*", "errors" -> ".*"))
+ }
+ }
+
+ // Scenario 4: join after drop and recreate table fails with
TABLE_ID_MISMATCH
+ test("SPARK-54157: join detects table drop and recreate via table ID") {
Review Comment:
Shall we also add a test with drop and re-recreate table with Column IDs
being supported?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]