Guosmilesmile opened a new pull request, #12639:
URL: https://github.com/apache/iceberg/pull/12639
We encountered a scenario where, when using Flink source to incrementally
consume data from Iceberg, the lastSnapshotId being consumed has already been
cleaned up. This can happen, for example, through Spark's expire_snapshots
(CALL iceberg.system.expire_snapshots(table => 'default.my_table', older_than
=> TIMESTAMP '2025-03-25 00:00:00.000', retain_last => 1)) or in other cases
where consumption is too slow and historical snapshots are cleaned up.
```java
Caused by: java.lang.IllegalArgumentException: Starting snapshot (exclusive)
2444106500863389603 is not a parent ancestor of end snapshot 2357271669960485754
at
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:430)
at
org.apache.iceberg.BaseIncrementalScan.fromSnapshotIdExclusive(BaseIncrementalScan.java:179)
at
org.apache.iceberg.BaseIncrementalScan.planFiles(BaseIncrementalScan.java:104)
at
org.apache.iceberg.BaseIncrementalAppendScan.planTasks(BaseIncrementalAppendScan.java:61)
at
org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:119)
at
org.apache.iceberg.flink.source.FlinkSplitPlanner.planIcebergSourceSplits(FlinkSplitPlanner.java:76)
at
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.discoverIncrementalSplits(ContinuousSplitPlannerImpl.java:135)
at
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:83)
at
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:130)
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
```
In such scenarios, the Flink job will repeatedly restart due to the snapshot
ID stored in the state being unavailable, requiring manual intervention to
restart the job for recovery. We hope to have a mechanism that allows the job
to recover automatically when encountering this situation, similar to how Kafka
handles out-of-range offsets by automatically starting to consume from the
earliest or latest available data.
This PR mainly adds a configuration option called
snapshot-expiration-reset-strategy. When the lastSnapshot is not the parent
ancestor of the current snapshot, it can be handled in three ways to avoid
manual intervention to restart the job for recovery :
Default mode: Maintain the current behavior.
Earliest mode: Start incremental consumption from the oldest snapshot as the
lastSnapshot.
Latest mode: Start incremental consumption from the current latest snapshot
as the lastSnapshot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]