This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f7a5bbd4a35 Revert "SolaceIO data loss - remove message ack from close 
and advance as it may lead to data loss during work rebalancing or retry. " 
(#37162)
f7a5bbd4a35 is described below

commit f7a5bbd4a35d9b6e128817755f774ba414ea5716
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Mon Jan 5 11:40:30 2026 +0100

    Revert "SolaceIO data loss - remove message ack from close and advance as 
it may lead to data loss during work rebalancing or retry. " (#37162)
    
    This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7.
---
 .../sdk/io/solace/read/SolaceCheckpointMark.java   |  9 +--
 .../sdk/io/solace/read/UnboundedSolaceReader.java  | 33 ++++++++--
 .../beam/sdk/io/solace/SolaceIOReadTest.java       | 73 +---------------------
 3 files changed, 37 insertions(+), 78 deletions(-)

diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index eb2d4b3006a..a913fd6133e 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -18,8 +18,8 @@
 package org.apache.beam.sdk.io.solace.read;
 
 import com.solacesystems.jcsmp.BytesXMLMessage;
-import java.util.List;
 import java.util.Objects;
+import java.util.Queue;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 @VisibleForTesting
 public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
   private static final Logger LOG = 
LoggerFactory.getLogger(SolaceCheckpointMark.class);
-  private transient List<BytesXMLMessage> safeToAck;
+  private transient Queue<BytesXMLMessage> safeToAck;
 
   @SuppressWarnings("initialization") // Avro will set the fields by breaking 
abstraction
   private SolaceCheckpointMark() {}
@@ -48,13 +48,14 @@ public class SolaceCheckpointMark implements 
UnboundedSource.CheckpointMark {
    *
    * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
    */
-  SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
+  SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
     this.safeToAck = safeToAck;
   }
 
   @Override
   public void finalizeCheckpoint() {
-    for (BytesXMLMessage msg : safeToAck) {
+    BytesXMLMessage msg;
+    while ((msg = safeToAck.poll()) != null) {
       try {
         msg.ackMessage();
       } catch (IllegalStateException e) {
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index 7c756169ef3..dc84e0a0701 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -27,6 +27,7 @@ import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,7 +42,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -60,6 +60,12 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
   private @Nullable BytesXMLMessage solaceOriginalRecord;
   private @Nullable T solaceMappedRecord;
 
+  /**
+   * Queue to place advanced messages before {@link #getCheckpointMark()} is 
called. CAUTION:
+   * Accessed by both reader and checkpointing threads.
+   */
+  private final Queue<BytesXMLMessage> safeToAckMessages = new 
ConcurrentLinkedQueue<>();
+
   /**
    * Queue for messages that were ingested in the {@link #advance()} method, 
but not sent yet to a
    * {@link SolaceCheckpointMark}.
@@ -130,6 +136,8 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public boolean advance() {
+    finalizeReadyMessages();
+
     BytesXMLMessage receivedXmlMessage;
     try {
       receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -150,9 +158,27 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public void close() {
+    finalizeReadyMessages();
     sessionServiceCache.invalidate(readerUuid);
   }
 
+  public void finalizeReadyMessages() {
+    BytesXMLMessage msg;
+    while ((msg = safeToAckMessages.poll()) != null) {
+      try {
+        msg.ackMessage();
+      } catch (IllegalStateException e) {
+        LOG.error(
+            "SolaceIO.Read: failed to acknowledge the message with 
applicationMessageId={}, ackMessageId={}. Returning the message to queue to 
retry.",
+            msg.getApplicationMessageId(),
+            msg.getAckMessageId(),
+            e);
+        safeToAckMessages.add(msg); // In case the error was transient, might 
succeed later
+        break; // Commit is only best effort
+      }
+    }
+  }
+
   @Override
   public Instant getWatermark() {
     // should be only used by a test receiver
@@ -164,10 +190,9 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-
-    ImmutableList<BytesXMLMessage> bytesXMLMessages = 
ImmutableList.copyOf(receivedMessages);
+    safeToAckMessages.addAll(receivedMessages);
     receivedMessages.clear();
-    return new SolaceCheckpointMark(bytesXMLMessages);
+    return new SolaceCheckpointMark(safeToAckMessages);
   }
 
   @Override
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index c17ec3e128d..a1f80932edd 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -458,13 +458,13 @@ public class SolaceIOReadTest {
     // mark all consumed messages as ready to be acknowledged
     CheckpointMark checkpointMark = reader.getCheckpointMark();
 
-    // consume 1 more message.
+    // consume 1 more message. This will call #ackMsg() on messages that were 
ready to be acked.
     reader.advance();
-    assertEquals(0, countAckMessages.get());
+    assertEquals(4, countAckMessages.get());
 
     // consume 1 more message. No change in the acknowledged messages.
     reader.advance();
-    assertEquals(0, countAckMessages.get());
+    assertEquals(4, countAckMessages.get());
 
     // acknowledge from the first checkpoint
     checkpointMark.finalizeCheckpoint();
@@ -473,73 +473,6 @@ public class SolaceIOReadTest {
     assertEquals(4, countAckMessages.get());
   }
 
-  @Test
-  public void testLateCheckpointOverlappingFlushingOfNextBundle() throws 
Exception {
-    AtomicInteger countConsumedMessages = new AtomicInteger(0);
-    AtomicInteger countAckMessages = new AtomicInteger(0);
-
-    // Broker that creates input data
-    SerializableFunction<Integer, BytesXMLMessage> recordFn =
-        index -> {
-          List<BytesXMLMessage> messages = new ArrayList<>();
-          for (int i = 0; i < 10; i++) {
-            messages.add(
-                SolaceDataUtils.getBytesXmlMessage(
-                    "payload_test" + i, "45" + i, (num) -> 
countAckMessages.incrementAndGet()));
-          }
-          countConsumedMessages.incrementAndGet();
-          return getOrNull(index, messages);
-        };
-
-    SessionServiceFactory fakeSessionServiceFactory =
-        
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
-
-    Read<Record> spec =
-        getDefaultRead()
-            .withSessionServiceFactory(fakeSessionServiceFactory)
-            .withMaxNumConnections(4);
-
-    UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
-
-    UnboundedReader<Record> reader =
-        initialSource.createReader(PipelineOptionsFactory.create(), null);
-
-    // start the reader and move to the first record
-    assertTrue(reader.start());
-
-    // consume 3 messages (NB: #start() already consumed the first message)
-    for (int i = 0; i < 3; i++) {
-      assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
-    }
-
-    // #advance() was called, but the messages were not ready to be 
acknowledged.
-    assertEquals(0, countAckMessages.get());
-
-    // mark all consumed messages as ready to be acknowledged
-    CheckpointMark checkpointMark = reader.getCheckpointMark();
-
-    // data is flushed
-
-    // consume 1 more message.
-    reader.advance();
-    assertEquals(0, countAckMessages.get());
-
-    // consume 1 more message. No change in the acknowledged messages.
-    reader.advance();
-    assertEquals(0, countAckMessages.get());
-
-    CheckpointMark checkpointMark2 = reader.getCheckpointMark();
-    // data is prepared for flushing that will be rejected
-
-    // acknowledge from the first checkpoint may arrive late
-    checkpointMark.finalizeCheckpoint();
-
-    assertEquals(4, countAckMessages.get());
-
-    checkpointMark2.finalizeCheckpoint();
-    assertEquals(6, countAckMessages.get());
-  }
-
   @Test
   public void testCheckpointMarkSafety() throws Exception {
 

Reply via email to