stankiewicz commented on issue #36991: URL: https://github.com/apache/beam/issues/36991#issuecomment-3614109917
Which runner you are using? Is this same data loss across both dataflow runners? The issue may be with checkpointing logic. During checkpoint creation receivedMessages message references are inserted into safeToAckMessages which is passed as reference into CheckpointMark. CheckpointMark is finalized (or not) asynchronously. As part of caching/reusing readers, we reuse UnboundedReader with safeToAckMessages. Maybe there is some late coming finalization which references same safeToAckMessages as next CheckpointMark leading to early ack of messages. scenario: 1. UnboundedReader is created for split 0, safeToAckMessages is empty, receivedMessages is empty 2. UnboundedReader advances, UserParDos are processing messages, 10k or 10s passes, receivedMessages is full of message references 3. getCheckpointMark is invoked, T1 checkpointMark is created with safeToAckMessages reference full of 10k messages, receivedMessages emptied 4. output of stage is flushed and checkpoint, callback c1 to finalize checkpoint is set but not yet invoked 5. UnboundedReader advances, UserParDos are processing messages, 10k or 10s passes, receivedMessages is full of message references 6. getCheckpointMark is invoked, T2 checkpointMark is created with safeToAckMessages reference full of *20k* messages 7. Imagine now finalize for T1 is invoked, acking *20k* messages as T1 and T2 has same reference 8. flush fails as other worker is assigned this split, checkpoint is rejected, T2 is not invoked 9. 10k/10s of messages are lost for this split problem: safeToAckMessages should not be shared between T1 and T2, each should have it's own list. solution: - Checkpoint mark should have reference to reader - remember previous checkpoint mark in atomicReference, if advance is called, finalize it. If checkpoint mark is finalized asynchronously, clear atomicReference first, then ack. - Checkpoint should have it's own list of safeToAckMessages, cleaned similarly to receivedMessages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
