andreaschat-db commented on code in PR #55536:
URL: https://github.com/apache/spark/pull/55536#discussion_r3246673371
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3448,276 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ // CACHE TABLE impact on reads.
+ // Tests that CACHE TABLE pins table state against external changes,
+ // while session writes invalidate and re-cache.
+
+ test("cached DSv2 table DataFrame is refreshed and reused after insert") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+ val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df1.write.insertInto(t)
+
+ // cache DataFrame pointing to table
+ val readDF1 = spark.table(t)
+ readDF1.cache()
+ assertCached(readDF1)
+ checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))
+
+ // insert more data, invalidating and refreshing cache entry
+ val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+ df2.write.insertInto(t)
+
+ // verify underlying plan is recached and picks up new data
+ val readDF2 = spark.table(t)
+ assertCached(readDF2)
+ checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"),
Row(4L, "d")))
+ }
+ }
+
+ test("SPARK-54022: caching table via Dataset API should pin table state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache table
+ spark.table(t).cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify external changes have no impact on cached state
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // add more data within session that should invalidate cache
+ sql(s"INSERT INTO $t VALUES (10, 100, 'x')")
+
+ // table should be re-cached correctly
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(10, 100, "x")))
+ }
+ }
+
+ test("SPARK-54022: caching a query via Dataset API should not pin table
state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache query on top of table
+ val df = spark.table(t).select("id")
+ df.cache()
+
+ // verify query caching works as expected
+ assertCached(spark.table(t).select("id"))
+ checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))
+
+ // verify table itself is not cached
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify cached DataFrame is unaffected by external changes
+ assertCached(df)
+ checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+ // verify external changes are reflected correctly when table is queried
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq.empty)
+ }
+ }
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Scenario 1: external write after CACHE TABLE is invisible (cache pinned).
+ test("SPARK-54022: cached table pinned against external data write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ // (bypasses this session's CacheManager)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ // cache is pinned, external write invisible
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2: session write invalidates cache; subsequent external write
+ // is again invisible.
+ test("SPARK-54022: session write invalidates cache, then external write
invisible") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // session write invalidates the cache entry
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+
+ // external writer adds (3, 300) via direct catalog API
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(3, 300))
+
+ // cache is re-pinned, external write invisible
Review Comment:
I am confused here. What does it mean the cache is re-pinned? If it was
re-pinned shouldn't have picked up the external changes? The external write
bypassed the `CacheManager` right?
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3448,276 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ // CACHE TABLE impact on reads.
+ // Tests that CACHE TABLE pins table state against external changes,
+ // while session writes invalidate and re-cache.
+
+ test("cached DSv2 table DataFrame is refreshed and reused after insert") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+ val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df1.write.insertInto(t)
+
+ // cache DataFrame pointing to table
+ val readDF1 = spark.table(t)
+ readDF1.cache()
+ assertCached(readDF1)
+ checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))
+
+ // insert more data, invalidating and refreshing cache entry
+ val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+ df2.write.insertInto(t)
+
+ // verify underlying plan is recached and picks up new data
+ val readDF2 = spark.table(t)
+ assertCached(readDF2)
+ checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"),
Row(4L, "d")))
+ }
+ }
+
+ test("SPARK-54022: caching table via Dataset API should pin table state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache table
+ spark.table(t).cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify external changes have no impact on cached state
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // add more data within session that should invalidate cache
+ sql(s"INSERT INTO $t VALUES (10, 100, 'x')")
+
+ // table should be re-cached correctly
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(10, 100, "x")))
+ }
+ }
+
+ test("SPARK-54022: caching a query via Dataset API should not pin table
state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache query on top of table
+ val df = spark.table(t).select("id")
+ df.cache()
+
+ // verify query caching works as expected
+ assertCached(spark.table(t).select("id"))
+ checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))
+
+ // verify table itself is not cached
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify cached DataFrame is unaffected by external changes
+ assertCached(df)
+ checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+ // verify external changes are reflected correctly when table is queried
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq.empty)
+ }
+ }
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Scenario 1: external write after CACHE TABLE is invisible (cache pinned).
+ test("SPARK-54022: cached table pinned against external data write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ // (bypasses this session's CacheManager)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ // cache is pinned, external write invisible
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2: session write invalidates cache; subsequent external write
+ // is again invisible.
+ test("SPARK-54022: session write invalidates cache, then external write
invisible") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // session write invalidates the cache entry
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+
+ // external writer adds (3, 300) via direct catalog API
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(3, 300))
+
+ // cache is re-pinned, external write invisible
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300)))
+ }
+ }
+
+ // Scenario 3: external schema change after CACHE TABLE.
+ // Cache stays pinned at original 2-column schema; external ADD COLUMN
+ // is invisible.
+ test("SPARK-54022: cached table pinned against external schema change") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // simulate external schema change + data write via direct catalog API
+ // (bypasses this session's CacheManager)
+ val addCol = TableChange.addColumn(Array("new_column"), IntegerType,
true)
+ catalog("testcat").alterTable(ident, addCol)
+
+ // external writer adds (2, 200, -1)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
InternalRow(2, 200, -1))
+
+ // cache stays pinned at original 2-column schema
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // REFRESH TABLE picks up external schema change and data
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+
+ // Scenario 4: session schema change invalidates cache; subsequent external
+ // write is invisible.
+ test("SPARK-54022: session schema change invalidates cache, external write
invisible") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // session schema change: invalidates cache, rebuilds with new schema
+ sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
+
+ // after session ALTER, cache is rebuilt with 3-column schema
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100, null)))
+
+ // external writer adds (2, 200, -1) via direct catalog API
+ // (bypasses this session's CacheManager)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT, new_column INT"),
InternalRow(2, 200, -1))
+
+ // external write invisible: cache still shows (1, 100, null)
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100, null)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1)))
+ }
+ }
+
+ // Scenario 5: external drop and recreate with same schema.
+ test("SPARK-54022: cached table after external drop and recreate sees empty
table") {
Review Comment:
Shall we also verify table IDs? Same for connect.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala:
##########
@@ -3535,4 +3448,276 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}
+
+ // CACHE TABLE impact on reads.
+ // Tests that CACHE TABLE pins table state against external changes,
+ // while session writes invalidate and re-cache.
+
+ test("cached DSv2 table DataFrame is refreshed and reused after insert") {
+ val t = "testcat.ns1.ns2.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
+ val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
+ df1.write.insertInto(t)
+
+ // cache DataFrame pointing to table
+ val readDF1 = spark.table(t)
+ readDF1.cache()
+ assertCached(readDF1)
+ checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))
+
+ // insert more data, invalidating and refreshing cache entry
+ val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
+ df2.write.insertInto(t)
+
+ // verify underlying plan is recached and picks up new data
+ val readDF2 = spark.table(t)
+ assertCached(readDF2)
+ checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"),
Row(4L, "d")))
+ }
+ }
+
+ test("SPARK-54022: caching table via Dataset API should pin table state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache table
+ spark.table(t).cache()
+
+ // verify caching works as expected
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify external changes have no impact on cached state
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // add more data within session that should invalidate cache
+ sql(s"INSERT INTO $t VALUES (10, 100, 'x')")
+
+ // table should be re-cached correctly
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(10, 100, "x")))
+ }
+ }
+
+ test("SPARK-54022: caching a query via Dataset API should not pin table
state") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")
+
+ // cache query on top of table
+ val df = spark.table(t).select("id")
+ df.cache()
+
+ // verify query caching works as expected
+ assertCached(spark.table(t).select("id"))
+ checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))
+
+ // verify table itself is not cached
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3,
30, "A")))
+
+ // modify table directly to mimic external changes
+ val table = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.DELETE))
+ table.asInstanceOf[TruncatableTable].truncateTable()
+
+ // verify cached DataFrame is unaffected by external changes
+ assertCached(df)
+ checkAnswer(df, Seq(Row(1), Row(2), Row(3)))
+
+ // verify external changes are reflected correctly when table is queried
+ assertNotCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq.empty)
+ }
+ }
+
+ private def externalAppend(
+ catalogName: String,
+ ident: Identifier,
+ schema: StructType,
+ row: InternalRow): Unit = {
+ val extTable = catalog(catalogName).loadTable(ident,
+ util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
+ extTable.withData(Array(
+ new BufferedRows(Seq.empty, schema).withRow(row)))
+ }
+
+ // Scenario 1: external write after CACHE TABLE is invisible (cache pinned).
+ test("SPARK-54022: cached table pinned against external data write") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // external writer adds (2, 200) via direct catalog API
+ // (bypasses this session's CacheManager)
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(2, 200))
+
+ // cache is pinned, external write invisible
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+ }
+ }
+
+ // Scenario 2: session write invalidates cache; subsequent external write
+ // is again invisible.
+ test("SPARK-54022: session write invalidates cache, then external write
invisible") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache the table
+ spark.table(t).cache()
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100)))
+
+ // session write invalidates the cache entry
+ sql(s"INSERT INTO $t VALUES (2, 200)")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+
+ // external writer adds (3, 300) via direct catalog API
+ externalAppend(catalogName = "testcat", ident = ident,
+ schema = StructType.fromDDL("id INT, salary INT"), InternalRow(3, 300))
+
+ // cache is re-pinned, external write invisible
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
+
+ // REFRESH TABLE picks up external write
+ sql(s"REFRESH TABLE $t")
+ assertCached(spark.table(t))
+ checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300)))
+ }
+ }
+
+ // Scenario 3: external schema change after CACHE TABLE.
+ // Cache stays pinned at original 2-column schema; external ADD COLUMN
+ // is invisible.
+ test("SPARK-54022: cached table pinned against external schema change") {
+ val t = "testcat.ns1.ns2.tbl"
+ val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
+ sql(s"INSERT INTO $t VALUES (1, 100)")
+
+ // cache table
Review Comment:
nit: redundant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]