stankiewicz commented on issue #36991:
URL: https://github.com/apache/beam/issues/36991#issuecomment-3614109917

   Which runner you are using? Is this same data loss across both dataflow 
runners?
   
   The issue may be with checkpointing logic.
   During checkpoint creation receivedMessages message references are inserted 
into safeToAckMessages which is passed as reference into CheckpointMark. 
CheckpointMark is finalized (or not) asynchronously. 
   
   As part of caching/reusing readers, we reuse UnboundedReader with 
safeToAckMessages.
   Maybe there is some late coming finalization which references same 
safeToAckMessages as next  CheckpointMark leading to early ack of messages.
   
   scenario:
   1. UnboundedReader is created for split 0, safeToAckMessages is empty, 
receivedMessages is empty
   2. UnboundedReader advances, UserParDos are processing messages, 10k or 10s 
passes, receivedMessages is full of message references
   3. getCheckpointMark is invoked, T1 checkpointMark is created with 
safeToAckMessages reference full of 10k messages, receivedMessages emptied
   4. output of stage is flushed and checkpoint, callback c1 to finalize 
checkpoint is set but not yet invoked
   5. UnboundedReader advances, UserParDos are processing messages, 10k or 10s 
passes, receivedMessages is full of message references
   6. getCheckpointMark is invoked, T2 checkpointMark is created with 
safeToAckMessages reference full of *20k* messages
   7. Imagine now finalize for T1 is invoked, acking *20k* messages as T1 and 
T2 has same reference
   8. flush fails as other worker is assigned this split, checkpoint is 
rejected, T2 is not invoked
   9. 10k/10s of messages are lost for this split
   
   problem:
   safeToAckMessages should not be shared between T1 and T2, each should have 
it's own list.
   solution: 
   - Checkpoint mark should have reference to reader
   - remember previous checkpoint mark in atomicReference, if advance is 
called, finalize it. If checkpoint mark is finalized asynchronously, clear 
atomicReference first, then ack. 
   - Checkpoint should have it's own list of safeToAckMessages, cleaned 
similarly to receivedMessages.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to