sethwhite-sf opened a new issue, #8977:
URL: https://github.com/apache/iceberg/issues/8977
### Apache Iceberg version
1.4.1 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
We have found that temporary views that reference an iceberg table become
stale when catalog caching is enabled:
spark.sql.catalog.catalog-name.cache-enabled=true.
Initially, when a view is created
Dataset<Row> rdd = spark.read().format("iceberg").load("table1");
rdd.createOrReplaceTempView("view1");
The view and catalog cache reference the same org.apache.iceberg.Table
object and the view reflects any changes that the application makes when it is
queried:
spark.sql("SELECT * from view1").show();
// query returns latest state of the table
However, once cache expiry occurs (after 30 seconds by default when caching
is enabled), subsequent updates to the table, such as
spark.sql("DELETE FROM table1 AS t WHERE t.id IS NULL");
cause a new entry for the table to be created in the cache and the view no
longer sees any of the changes that are made---it becomes stale---because the
view is still using the original org.apache.iceberg.Table object which
references an Iceberg table snapshot that is now no longer current. The view
and cache are no longer in sync.
spark.sql("SELECT * from view1").show();
// No longer returns latest state of the table
The unit test below illustrates the problem. The test fails when the
default catalog caching is enabled.
```
@Test
public void testViewConsistencyAfterCacheExpiration() throws Exception {
Assume.assumeFalse("Avro does not support metadata delete",
fileFormat.equals("avro"));
createAndInitUnpartitionedTable();
sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null,
'hr')", tableName);
Dataset<Row> rdd = spark.read().format("iceberg").load(tableName);
rdd.createOrReplaceTempView("view1");
assertEquals(
"Should have expected rows",
ImmutableList.of(row(null, "hr"),row(1, "hr"), row(2,
"hardware")),
sql("SELECT * FROM %s ORDER BY id", "view1"));
Thread.sleep(40000);
// Default cache expiration is 30 seconds.
sql("DELETE FROM %s AS t WHERE t.id IS NULL", tableName);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "hr"), row(2, "hardware")),
sql("SELECT * FROM %s ORDER BY id", "view1"));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]