elopezal opened a new issue, #9504:
URL: https://github.com/apache/iceberg/issues/9504
### Apache Iceberg version
1.4.0
### Query engine
Spark
### Please describe the bug 🐞
We are reading with spark streaming in Scala an Iceberg table as source to
write into another Iceberg table.
The source Iceberg table receives the information from kafka and it has
compact and expiring snapshot maintenance options.
We are using this code to read the source table
```
spark.readStream
.format("iceberg")
.load(s"${icebergConf.icebergTableQualifier}")
```
Source Iceberg table after different writeStream processes and compact
maintenance operations has the following snapshots
<img width="551" alt="Captura de pantalla 2024-01-18 a las 11 17 20"
src="https://github.com/apache/iceberg/assets/157010260/9b8cca78-63f1-47df-917a-ad12e56154e8">
This situation works fine and we are able to readStream from this Iceberg
table to write into another one, but once the expire snapshot maintenance
operations are done and the oldest snapshots are removed the process fails and
we have the following error.
<img width="550" alt="Captura de pantalla 2024-01-18 a las 11 24 10"
src="https://github.com/apache/iceberg/assets/157010260/80a2ad44-efe1-44db-8a0e-0b7e03e36d86">
`Exception in thread "main" ERROR: [STREAM_FAILED] Query [id =
c084caa6-0907-4781-b6a7-8f8991929f97, runId =
7781194d-298d-4929-be8e-c5407bd98566] terminated with exception: Cannot load
current offset at snapshot 1615816462090596768, the snapshot was expired or
removed`
How can we readStream from an Iceberg table whose old snapshots are expiring?
We tried to get timestamp from the newest snapshot to apply
stream-from-timestamp configuration but it didn't work and we got the same error
```
if
(spark.catalog.tableExists(s"${icebergConf.icebergTableQualifier}.snapshots")) {
val latestSnapshotTimestampDF = spark.read
.table(s"${icebergConf.icebergTableQualifier}.snapshots")
.agg(F.max("committed_at").as("latest_snapshot_timestamp"))
// Get the latest snapshot timestamp as a Long
val latestSnapshotTimestamp = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS")
.parse(
latestSnapshotTimestampDF
.head()
.getAs[Long]("latest_snapshot_timestamp")
.toString)
.getTime
logger.info(s"Latest Snapshot Timestamp: ${latestSnapshotTimestamp}")
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", latestSnapshotTimestamp)
.load(s"${icebergConf.icebergTableQualifier}")
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]