scwhittle commented on code in PR #38603:
URL: https://github.com/apache/beam/pull/38603#discussion_r3310337997
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -158,23 +163,30 @@ public boolean advance() {
@Override
public void close() {
- finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
+ ActiveReadersRegistry.unregister(readerUuid);
}
- public void finalizeReadyMessages() {
- BytesXMLMessage msg;
- while ((msg = safeToAckMessages.poll()) != null) {
+ void finalizeCheckpoint(long checkpointId) {
+ List<BytesXMLMessage> messagesToAck = new ArrayList<>();
+
+ synchronized (lock) {
+ SortedMap<Long, List<BytesXMLMessage>> toAck =
pendingCheckpoints.headMap(checkpointId, true);
Review Comment:
acking previous messages seems safe if Solace itself returns things in
order. However if this is just stored in-memory in this process we may have
cases where the worker crashes and we don't have the previous messages that
were committed as processed to the backend in memory.
Or if ranges are reassigned so that this process no longer has the source
assigned to it we could not get a more recent finalization. We may need some
timeout on the cache so that we nack in that case (perhaps the existing reader
cache could be sufficient).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]