lrsb opened a new pull request, #16011: URL: https://github.com/apache/iceberg/pull/16011
The DynamicCommitter deduplicates commits on the triplet (flink.job-id, flink.operator-id, max-committed-checkpoint-id) stored in each snapshot summary. The aggregator previously sampled the job id from the runtime environment on every operator open(), so a restart or rescale with a fresh Flink JobID would stamp restored write results with the new id. The committer's ancestor walk then could not find the snapshot committed under the old id, and silently re-committed the data, producing duplicate rows in the target table. Persist the aggregator's job id as operator state and reuse it on restore, so committables keep the job id under which their data was originally produced. This restores the dedup invariant across job restarts, autoscaler savepoint+resubmit cycles, and session-cluster resubmissions, which was broken under sustained catalog-side latency where commit responses were lost client-side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
