lrsb opened a new pull request, #16011:
URL: https://github.com/apache/iceberg/pull/16011

   The DynamicCommitter deduplicates commits on the triplet (flink.job-id, 
flink.operator-id, max-committed-checkpoint-id) stored in each snapshot 
summary. The aggregator previously sampled the job id from the runtime 
environment on every operator open(), so a restart or rescale with a fresh 
Flink JobID would stamp restored write results with the new id. The committer's 
ancestor walk then could not find the snapshot committed under the old id, and 
silently re-committed the data, producing duplicate rows in the target table.
   
   Persist the aggregator's job id as operator state and reuse it on restore, 
so committables keep the job id under which their data was originally produced. 
This restores the dedup invariant across job restarts, autoscaler 
savepoint+resubmit cycles, and session-cluster resubmissions, which was broken 
under sustained catalog-side latency where commit responses were lost 
client-side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to