koodin9 commented on PR #15651:
URL: https://github.com/apache/iceberg/pull/15651#issuecomment-4096525235

   @t3hw 
   Hi, thanks for looking into this issue. I've been working on the same 
problem in our fork (based on 
https://github.com/databricks/iceberg-kafka-connect) and wanted to share some 
observations about the partial commit (timeout) scenario that this fix may not 
fully address. 
   
   When commitTimeoutMs is exceeded, the Coordinator performs a partial commit 
— it commits only the DataWritten events received so far and moves on. Workers 
that were still processing at the time will send their 
   DataWritten(A) + DataComplete(A) later to the control topic.
   
   Here's the timeline:
   
   * Cycle A:
       * startNewCommit(A)→ commitBuffer.clear() + new UUID
       * consumeAvailable() → Worker 0 still processing, nothing consumed
       * isCommitTimedOut() → true 
       * doCommit(true) → partial commit succeeds (without Worker 0's data)
       * clearResponses()
       * endCurrentCommit()
   
   * Worker 0 finishes and sends DataWritten(A) + DataComplete(A) to control 
topic
   
   * process() call N:
       * consumeAvailable() → DataWritten(A) consumed → added to commitBuffer
   
   * process() call N+1:
       * startNewCommit(B)→ commitBuffer.clear() ← **DataWritten(A) DISCARDED**
       * consumeAvailable() → ...
    
   With this PR's fix, commitBuffer.clear() in startNewCommit() discards Worker 
0's DataWritten(A). Since the partial commit already advanced the control topic 
consumer offsets (via commitConsumerOffsets()), this event won't be 
re-consumed. The data files exist on storage but are never committed to the 
Iceberg table — resulting in data loss.
   
   **The root cause is broader**
    
   The underlying issue isn't just stale events lingering in the buffer — it's 
that tableCommitMap() doesn't distinguish events by commitId. Even if stale 
events survive in the buffer (which they should, since they 
   represent legitimate data), they get merged into the same RowDelta as 
current-cycle events, receiving the same sequence number. This breaks equality 
delete semantics (data_sequence_number < delete_sequence_number is required for 
equality deletes to apply).
   
   Alternative approach: commit-id-based separation
    
   Instead of discarding stale events, a safer fix would be to separate them by 
commitId and commit each group in its own RowDelta. This ensures:
   - Stale data files get a lower sequence number (committed first)
   - Current data files get a higher sequence number (committed last)
   - Equality deletes from the current cycle correctly apply to stale data 
(stale_seq < current_seq)
   - No data loss — all legitimate DataWritten events are committed
   
   This approach also covers the failed-commit and recovery scenarios your PR 
addresses, since stale events from any prior cycle are naturally separated by 
their commitId.
   
   We've already implemented this approach in our fork and will be deploying it 
to production soon. We'll also be opening a PR here shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to