koodin9 commented on PR #15651: URL: https://github.com/apache/iceberg/pull/15651#issuecomment-4096525235
@t3hw Hi, thanks for looking into this issue. I've been working on the same problem in our fork (based on https://github.com/databricks/iceberg-kafka-connect) and wanted to share some observations about the partial commit (timeout) scenario that this fix may not fully address. When commitTimeoutMs is exceeded, the Coordinator performs a partial commit — it commits only the DataWritten events received so far and moves on. Workers that were still processing at the time will send their DataWritten(A) + DataComplete(A) later to the control topic. Here's the timeline: * Cycle A: * startNewCommit(A)→ commitBuffer.clear() + new UUID * consumeAvailable() → Worker 0 still processing, nothing consumed * isCommitTimedOut() → true * doCommit(true) → partial commit succeeds (without Worker 0's data) * clearResponses() * endCurrentCommit() * Worker 0 finishes and sends DataWritten(A) + DataComplete(A) to control topic * process() call N: * consumeAvailable() → DataWritten(A) consumed → added to commitBuffer * process() call N+1: * startNewCommit(B)→ commitBuffer.clear() ← **DataWritten(A) DISCARDED** * consumeAvailable() → ... With this PR's fix, commitBuffer.clear() in startNewCommit() discards Worker 0's DataWritten(A). Since the partial commit already advanced the control topic consumer offsets (via commitConsumerOffsets()), this event won't be re-consumed. The data files exist on storage but are never committed to the Iceberg table — resulting in data loss. **The root cause is broader** The underlying issue isn't just stale events lingering in the buffer — it's that tableCommitMap() doesn't distinguish events by commitId. Even if stale events survive in the buffer (which they should, since they represent legitimate data), they get merged into the same RowDelta as current-cycle events, receiving the same sequence number. This breaks equality delete semantics (data_sequence_number < delete_sequence_number is required for equality deletes to apply). Alternative approach: commit-id-based separation Instead of discarding stale events, a safer fix would be to separate them by commitId and commit each group in its own RowDelta. This ensures: - Stale data files get a lower sequence number (committed first) - Current data files get a higher sequence number (committed last) - Equality deletes from the current cycle correctly apply to stale data (stale_seq < current_seq) - No data loss — all legitimate DataWritten events are committed This approach also covers the failed-commit and recovery scenarios your PR addresses, since stale events from any prior cycle are naturally separated by their commitId. We've already implemented this approach in our fork and will be deploying it to production soon. We'll also be opening a PR here shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
