bzablocki commented on issue #32596: URL: https://github.com/apache/beam/issues/32596#issuecomment-2407413558
It seems that we should account for the possibility of the checkpoint not being finalized. This is handled correctly in PubSubIO: https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L992-L993 The flow there is following: 1. in the `advance()`, we copy message to safeToAckIds list ([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L885)) 2. when the runner request a checkpoint, we create it with a **copy** of `safeToAck` list ([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L996)) 3. when the checkpoint is finalized, we add the messages to a queue of finalized messages ([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L579)) 4. in the `advance()` method, remove whatever is in the queue of finalized messages from the safeToAckIds ([source](https://github.com/apache/beam/blob/07322cc86d35fd2af5c32228796e7936f58416d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L617)). I can work on that, but I'm going on holidays now, so I could pick it up in 2 weeks, which gives me ~2 weeks before the 2.61 release. Again, thank you @ppawel for reporting. That's an important bug :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
