bzablocki commented on issue #32596:
URL: https://github.com/apache/beam/issues/32596#issuecomment-2407413558

   It seems that we should account for the possibility of the checkpoint not 
being finalized. 
   This is handled correctly in PubSubIO: 
https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L992-L993
   
   The flow there is following:
   1. in the `advance()`, we copy message to safeToAckIds list 
([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L885))
   2. when the runner request a checkpoint, we create it with a **copy** of 
`safeToAck` list 
([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L996))
   3. when the checkpoint is finalized, we add the messages to a queue of 
finalized messages 
([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L579))
   4. in the `advance()` method, remove whatever is in the queue of finalized 
messages from the safeToAckIds 
([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L617)).
   
   I can work on that, but I'm going on holidays now, so I could pick it up in 
2 weeks, which gives me ~2 weeks before the 2.61 release.
   
   Again, thank you @ppawel for reporting. That's an important bug :) 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to