gemini-code-assist[bot] commented on code in PR #38603:
URL: https://github.com/apache/beam/pull/38603#discussion_r3289490421


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/ActiveReadersRegistry.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.read;
+
+import java.lang.ref.WeakReference;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * A global registry to keep track of active {@link UnboundedSolaceReader} 
instances on the worker
+ * JVM using weak references.
+ *
+ * <p>This allows serialized {@link SolaceCheckpointMark} instances to resolve 
their originating
+ * reader and perform sequential acknowledgments.
+ */
+class ActiveReadersRegistry {
+  private static final ConcurrentHashMap<UUID, 
WeakReference<UnboundedSolaceReader<?>>> registry =
+      new ConcurrentHashMap<>();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using a `ConcurrentHashMap` with `WeakReference` values can lead to a slow 
memory leak of `UUID` keys if `unregister` is not called (e.g., if a reader is 
abandoned or crashes during initialization). 
   
   Consider using Guava's `Cache` with `weakValues()`, which provides better 
automatic cleanup of keys when the values are garbage collected.
   
   ```java
     private static final 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache<UUID, 
UnboundedSolaceReader<?>> registry =
         
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder.newBuilder().weakValues().build();
   
     public static void register(UUID uuid, UnboundedSolaceReader<?> reader) {
       registry.put(uuid, reader);
     }
   
     public static void unregister(UUID uuid) {
       registry.invalidate(uuid);
     }
   
     public static @Nullable UnboundedSolaceReader<?> get(UUID uuid) {
       return registry.getIfPresent(uuid);
     }
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java:
##########
@@ -39,33 +37,42 @@
 @VisibleForTesting
 public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
   private static final Logger LOG = 
LoggerFactory.getLogger(SolaceCheckpointMark.class);
-  private transient Queue<BytesXMLMessage> safeToAck;
+  private String readerUuid;
+  private long checkpointId;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since `UUID` is `Serializable` and supported by many coders, storing it as a 
`String` and repeatedly calling `UUID.fromString()` during finalization adds 
unnecessary overhead. It's more efficient to store the `UUID` object directly.
   
   ```suggestion
     private UUID readerUuid;
     private long checkpointId;
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -151,30 +155,35 @@ public boolean advance() {
     }
     solaceOriginalRecord = receivedXmlMessage;
     solaceMappedRecord = 
getCurrentSource().getParseFn().apply(receivedXmlMessage);
-    receivedMessages.add(receivedXmlMessage);
+    synchronized (this) {
+      receivedMessages.add(receivedXmlMessage);
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Synchronizing on `this` is generally discouraged for internal state 
management as it exposes the lock to external callers, which could lead to 
unexpected deadlocks. It is safer to use a private lock object.
   
   ```suggestion
       synchronized (lock) {
         receivedMessages.add(receivedXmlMessage);
       }
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -151,30 +155,35 @@ public boolean advance() {
     }
     solaceOriginalRecord = receivedXmlMessage;
     solaceMappedRecord = 
getCurrentSource().getParseFn().apply(receivedXmlMessage);
-    receivedMessages.add(receivedXmlMessage);
+    synchronized (this) {
+      receivedMessages.add(receivedXmlMessage);
+    }
 
     return true;
   }
 
   @Override
   public void close() {
-    finalizeReadyMessages();
     sessionServiceCache.invalidate(readerUuid);
+    ActiveReadersRegistry.unregister(readerUuid);
   }
 
-  public void finalizeReadyMessages() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAckMessages.poll()) != null) {
+  public void finalizeCheckpoint(long checkpointId) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This method is only intended to be called by `SolaceCheckpointMark` within 
the same package. Reducing its visibility to package-private follows the 
principle of least privilege.
   
   ```suggestion
     void finalizeCheckpoint(long checkpointId) {
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -151,30 +155,35 @@ public boolean advance() {
     }
     solaceOriginalRecord = receivedXmlMessage;
     solaceMappedRecord = 
getCurrentSource().getParseFn().apply(receivedXmlMessage);
-    receivedMessages.add(receivedXmlMessage);
+    synchronized (this) {
+      receivedMessages.add(receivedXmlMessage);
+    }
 
     return true;
   }
 
   @Override
   public void close() {
-    finalizeReadyMessages();
     sessionServiceCache.invalidate(readerUuid);
+    ActiveReadersRegistry.unregister(readerUuid);
   }
 
-  public void finalizeReadyMessages() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAckMessages.poll()) != null) {
+  public void finalizeCheckpoint(long checkpointId) {
+    List<BytesXMLMessage> messagesToAck = new ArrayList<>();
+
+    synchronized (this) {
+      SortedMap<Long, List<BytesXMLMessage>> toAck = 
pendingCheckpoints.headMap(checkpointId, true);
+      for (List<BytesXMLMessage> msgs : toAck.values()) {
+        messagesToAck.addAll(msgs);
+      }
+      toAck.clear();
+    }
+
+    for (BytesXMLMessage msg : messagesToAck) {
       try {
         msg.ackMessage();
       } catch (IllegalStateException e) {
-        LOG.error(
-            "SolaceIO.Read: failed to acknowledge the message with 
applicationMessageId={}, ackMessageId={}. Returning the message to queue to 
retry.",
-            msg.getApplicationMessageId(),
-            msg.getAckMessageId(),
-            e);
-        safeToAckMessages.add(msg); // In case the error was transient, might 
succeed later
-        break; // Commit is only best effort
+        LOG.warn("SolaceIO.Read: Failed to ack message, session might be 
closed.", e);
       }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The previous implementation logged the `applicationMessageId` and 
`ackMessageId` when an acknowledgment failed. This information is crucial for 
debugging data flow issues and identifying specific problematic messages. 
Please restore these details in the log message.
   
   ```java
         } catch (IllegalStateException e) {
           LOG.warn(
               "SolaceIO.Read: Failed to acknowledge message with 
applicationMessageId={}, ackMessageId={}. Session might be closed.",
               msg.getApplicationMessageId(),
               msg.getAckMessageId(),
               e);
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to