This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4356253a5e8 Revert "SolaceIO.Read: handle occasional cases when 
finalizeCheckpoint is not executed (#32962)" (#33259)
4356253a5e8 is described below

commit 4356253a5e8124bf39152a9ead9ad26ef7267750
Author: Bartosz Zablocki <bzablo...@google.com>
AuthorDate: Mon Dec 2 18:18:42 2024 +0000

    Revert "SolaceIO.Read: handle occasional cases when finalizeCheckpoint is 
not executed (#32962)" (#33259)
    
    This reverts commit e279e55f47683bcee1c59c0b7aea2b21741fadb3.
---
 .../broker/BasicAuthJcsmpSessionService.java       |  18 ++-
 .../beam/sdk/io/solace/broker/MessageReceiver.java |   7 ++
 .../beam/sdk/io/solace/broker/SessionService.java  |   7 ++
 .../io/solace/broker/SolaceMessageReceiver.java    |  17 ++-
 .../sdk/io/solace/read/SolaceCheckpointMark.java   |  46 ++++---
 .../sdk/io/solace/read/UnboundedSolaceReader.java  | 135 ++++++---------------
 .../sdk/io/solace/MockEmptySessionService.java     |   5 +
 .../beam/sdk/io/solace/MockSessionService.java     |  10 ++
 .../beam/sdk/io/solace/SolaceIOReadTest.java       |  20 ++-
 9 files changed, 127 insertions(+), 138 deletions(-)

diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
index d4c9a3ec621..b2196dbf106 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
@@ -102,7 +102,10 @@ public abstract class BasicAuthJcsmpSessionService extends 
SessionService {
           if (messageReceiver != null) {
             messageReceiver.close();
           }
-          if (jcsmpSession != null) {
+          if (messageProducer != null) {
+            messageProducer.close();
+          }
+          if (!isClosed()) {
             checkStateNotNull(jcsmpSession).closeSession();
           }
           return 0;
@@ -116,9 +119,8 @@ public abstract class BasicAuthJcsmpSessionService extends 
SessionService {
       this.messageReceiver =
           retryCallableManager.retryCallable(
               this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
-      this.messageReceiver.start();
     }
-    return checkStateNotNull(this.messageReceiver);
+    return this.messageReceiver;
   }
 
   @Override
@@ -136,10 +138,15 @@ public abstract class BasicAuthJcsmpSessionService 
extends SessionService {
     return publishedResultsQueue;
   }
 
+  @Override
+  public boolean isClosed() {
+    return jcsmpSession == null || jcsmpSession.isClosed();
+  }
+
   private MessageProducer createXMLMessageProducer(SubmissionMode 
submissionMode)
       throws JCSMPException, IOException {
 
-    if (jcsmpSession == null) {
+    if (isClosed()) {
       connectWriteSession(submissionMode);
     }
 
@@ -158,6 +165,9 @@ public abstract class BasicAuthJcsmpSessionService extends 
SessionService {
   }
 
   private MessageReceiver createFlowReceiver() throws JCSMPException, 
IOException {
+    if (isClosed()) {
+      connectSession();
+    }
 
     Queue queue =
         JCSMPFactory.onlyInstance()
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java
index 017a6326067..95f989bd1be 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java
@@ -35,6 +35,13 @@ public interface MessageReceiver {
    */
   void start();
 
+  /**
+   * Returns {@literal true} if the message receiver is closed, {@literal 
false} otherwise.
+   *
+   * <p>A message receiver is closed when it is no longer able to receive 
messages.
+   */
+  boolean isClosed();
+
   /**
    * Receives a message from the broker.
    *
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
index 6dcd0b65261..84a876a9d0b 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -120,6 +120,13 @@ public abstract class SessionService implements 
Serializable {
   /** Gracefully closes the connection to the service. */
   public abstract void close();
 
+  /**
+   * Checks whether the connection to the service is currently closed. This 
method is called when an
+   * `UnboundedSolaceReader` is starting to read messages - a session will be 
created if this
+   * returns true.
+   */
+  public abstract boolean isClosed();
+
   /**
    * Returns a MessageReceiver object for receiving messages from Solace. If 
it is the first time
    * this method is used, the receiver is created from the session instance, 
otherwise it returns
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
index d74f3cae89f..d548d2049a5 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java
@@ -24,8 +24,12 @@ import com.solacesystems.jcsmp.StaleSessionException;
 import java.io.IOException;
 import org.apache.beam.sdk.io.solace.RetryCallableManager;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SolaceMessageReceiver implements MessageReceiver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SolaceMessageReceiver.class);
+
   public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
   private final FlowReceiver flowReceiver;
   private final RetryCallableManager retryCallableManager = 
RetryCallableManager.create();
@@ -48,14 +52,19 @@ public class SolaceMessageReceiver implements 
MessageReceiver {
         ImmutableSet.of(JCSMPException.class));
   }
 
+  @Override
+  public boolean isClosed() {
+    return flowReceiver == null || flowReceiver.isClosed();
+  }
+
   @Override
   public BytesXMLMessage receive() throws IOException {
     try {
       return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
     } catch (StaleSessionException e) {
+      LOG.warn("SolaceIO: Caught StaleSessionException, restarting the 
FlowReceiver.");
       startFlowReceiver();
-      throw new IOException(
-          "SolaceIO: Caught StaleSessionException, restarting the 
FlowReceiver.", e);
+      throw new IOException(e);
     } catch (JCSMPException e) {
       throw new IOException(e);
     }
@@ -63,6 +72,8 @@ public class SolaceMessageReceiver implements MessageReceiver 
{
 
   @Override
   public void close() {
-    flowReceiver.close();
+    if (!isClosed()) {
+      this.flowReceiver.close();
+    }
   }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index a913fd6133e..77f6eed8f62 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -18,16 +18,17 @@
 package org.apache.beam.sdk.io.solace.read;
 
 import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Checkpoint for an unbounded Solace source. Consists of the Solace messages 
waiting to be
@@ -37,8 +38,10 @@ import org.slf4j.LoggerFactory;
 @Internal
 @VisibleForTesting
 public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SolaceCheckpointMark.class);
-  private transient Queue<BytesXMLMessage> safeToAck;
+  private transient AtomicBoolean activeReader;
+  // BytesXMLMessage is not serializable so if a job restarts from the 
checkpoint, we cannot retry
+  // these messages here. We relay on Solace's retry mechanism.
+  private transient ArrayDeque<BytesXMLMessage> ackQueue;
 
   @SuppressWarnings("initialization") // Avro will set the fields by breaking 
abstraction
   private SolaceCheckpointMark() {}
@@ -46,24 +49,25 @@ public class SolaceCheckpointMark implements 
UnboundedSource.CheckpointMark {
   /**
    * Creates a new {@link SolaceCheckpointMark}.
    *
-   * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
+   * @param activeReader {@link AtomicBoolean} indicating if the related 
reader is active. The
+   *     reader creating the messages has to be active to acknowledge the 
messages.
+   * @param ackQueue {@link List} of {@link BytesXMLMessage} to be 
acknowledged.
    */
-  SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
-    this.safeToAck = safeToAck;
+  SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> 
ackQueue) {
+    this.activeReader = activeReader;
+    this.ackQueue = new ArrayDeque<>(ackQueue);
   }
 
   @Override
   public void finalizeCheckpoint() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAck.poll()) != null) {
-      try {
+    if (activeReader == null || !activeReader.get() || ackQueue == null) {
+      return;
+    }
+
+    while (!ackQueue.isEmpty()) {
+      BytesXMLMessage msg = ackQueue.poll();
+      if (msg != null) {
         msg.ackMessage();
-      } catch (IllegalStateException e) {
-        LOG.error(
-            "SolaceIO.Read: cannot acknowledge the message with 
applicationMessageId={}, ackMessageId={}. It will not be retried.",
-            msg.getApplicationMessageId(),
-            msg.getAckMessageId(),
-            e);
       }
     }
   }
@@ -80,11 +84,15 @@ public class SolaceCheckpointMark implements 
UnboundedSource.CheckpointMark {
       return false;
     }
     SolaceCheckpointMark that = (SolaceCheckpointMark) o;
-    return Objects.equals(safeToAck, that.safeToAck);
+    // Needed to convert to ArrayList because ArrayDeque.equals checks only 
for reference, not
+    // content.
+    ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue);
+    ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue);
+    return Objects.equals(activeReader, that.activeReader) && 
Objects.equals(ackList, thatAckList);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(safeToAck);
+    return Objects.hash(activeReader, ackQueue);
   }
 }
diff --git 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index dc84e0a0701..a421970370d 100644
--- 
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++ 
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -22,26 +22,17 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import com.solacesystems.jcsmp.BytesXMLMessage;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.solace.broker.SempClient;
 import org.apache.beam.sdk.io.solace.broker.SessionService;
-import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -55,92 +46,48 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
   private final UnboundedSolaceSource<T> currentSource;
   private final WatermarkPolicy<T> watermarkPolicy;
   private final SempClient sempClient;
-  private final UUID readerUuid;
-  private final SessionServiceFactory sessionServiceFactory;
   private @Nullable BytesXMLMessage solaceOriginalRecord;
   private @Nullable T solaceMappedRecord;
+  private @Nullable SessionService sessionService;
+  AtomicBoolean active = new AtomicBoolean(true);
 
   /**
-   * Queue to place advanced messages before {@link #getCheckpointMark()} is 
called. CAUTION:
-   * Accessed by both reader and checkpointing threads.
+   * Queue to place advanced messages before {@link #getCheckpointMark()} be 
called non-concurrent
+   * queue, should only be accessed by the reader thread A given {@link 
UnboundedReader} object will
+   * only be accessed by a single thread at once.
    */
-  private final Queue<BytesXMLMessage> safeToAckMessages = new 
ConcurrentLinkedQueue<>();
-
-  /**
-   * Queue for messages that were ingested in the {@link #advance()} method, 
but not sent yet to a
-   * {@link SolaceCheckpointMark}.
-   */
-  private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>();
-
-  private static final Cache<UUID, SessionService> sessionServiceCache;
-  private static final ScheduledExecutorService cleanUpThread = 
Executors.newScheduledThreadPool(1);
-
-  static {
-    Duration cacheExpirationTimeout = Duration.ofMinutes(1);
-    sessionServiceCache =
-        CacheBuilder.newBuilder()
-            .expireAfterAccess(cacheExpirationTimeout)
-            .removalListener(
-                (RemovalNotification<UUID, SessionService> notification) -> {
-                  LOG.info(
-                      "SolaceIO.Read: Closing session for the reader with uuid 
{} as it has been idle for over {}.",
-                      notification.getKey(),
-                      cacheExpirationTimeout);
-                  SessionService sessionService = notification.getValue();
-                  if (sessionService != null) {
-                    sessionService.close();
-                  }
-                })
-            .build();
-
-    startCleanUpThread();
-  }
-
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private static void startCleanUpThread() {
-    cleanUpThread.scheduleAtFixedRate(sessionServiceCache::cleanUp, 1, 1, 
TimeUnit.MINUTES);
-  }
+  private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new 
ArrayDeque<>();
 
   public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) {
     this.currentSource = currentSource;
     this.watermarkPolicy =
         WatermarkPolicy.create(
             currentSource.getTimestampFn(), 
currentSource.getWatermarkIdleDurationThreshold());
-    this.sessionServiceFactory = currentSource.getSessionServiceFactory();
+    this.sessionService = currentSource.getSessionServiceFactory().create();
     this.sempClient = currentSource.getSempClientFactory().create();
-    this.readerUuid = UUID.randomUUID();
-  }
-
-  private SessionService getSessionService() {
-    try {
-      return sessionServiceCache.get(
-          readerUuid,
-          () -> {
-            LOG.info("SolaceIO.Read: creating a new session for reader with 
uuid {}.", readerUuid);
-            SessionService sessionService = sessionServiceFactory.create();
-            sessionService.connect();
-            sessionService.getReceiver().start();
-            return sessionService;
-          });
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e);
-    }
   }
 
   @Override
   public boolean start() {
-    // Create and initialize SessionService with Receiver
-    getSessionService();
+    populateSession();
+    checkNotNull(sessionService).getReceiver().start();
     return advance();
   }
 
+  public void populateSession() {
+    if (sessionService == null) {
+      sessionService = getCurrentSource().getSessionServiceFactory().create();
+    }
+    if (sessionService.isClosed()) {
+      checkNotNull(sessionService).connect();
+    }
+  }
+
   @Override
   public boolean advance() {
-    finalizeReadyMessages();
-
     BytesXMLMessage receivedXmlMessage;
     try {
-      receivedXmlMessage = getSessionService().getReceiver().receive();
+      receivedXmlMessage = 
checkNotNull(sessionService).getReceiver().receive();
     } catch (IOException e) {
       LOG.warn("SolaceIO.Read: Exception when pulling messages from the 
broker.", e);
       return false;
@@ -149,40 +96,23 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
     if (receivedXmlMessage == null) {
       return false;
     }
+    elementsToCheckpoint.add(receivedXmlMessage);
     solaceOriginalRecord = receivedXmlMessage;
     solaceMappedRecord = 
getCurrentSource().getParseFn().apply(receivedXmlMessage);
-    receivedMessages.add(receivedXmlMessage);
-
+    watermarkPolicy.update(solaceMappedRecord);
     return true;
   }
 
   @Override
   public void close() {
-    finalizeReadyMessages();
-    sessionServiceCache.invalidate(readerUuid);
-  }
-
-  public void finalizeReadyMessages() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAckMessages.poll()) != null) {
-      try {
-        msg.ackMessage();
-      } catch (IllegalStateException e) {
-        LOG.error(
-            "SolaceIO.Read: failed to acknowledge the message with 
applicationMessageId={}, ackMessageId={}. Returning the message to queue to 
retry.",
-            msg.getApplicationMessageId(),
-            msg.getAckMessageId(),
-            e);
-        safeToAckMessages.add(msg); // In case the error was transient, might 
succeed later
-        break; // Commit is only best effort
-      }
-    }
+    active.set(false);
+    checkNotNull(sessionService).close();
   }
 
   @Override
   public Instant getWatermark() {
     // should be only used by a test receiver
-    if (getSessionService().getReceiver().isEOF()) {
+    if (checkNotNull(sessionService).getReceiver().isEOF()) {
       return BoundedWindow.TIMESTAMP_MAX_VALUE;
     }
     return watermarkPolicy.getWatermark();
@@ -190,9 +120,14 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-    safeToAckMessages.addAll(receivedMessages);
-    receivedMessages.clear();
-    return new SolaceCheckpointMark(safeToAckMessages);
+    List<BytesXMLMessage> ackQueue = new ArrayList<>();
+    while (!elementsToCheckpoint.isEmpty()) {
+      BytesXMLMessage msg = elementsToCheckpoint.poll();
+      if (msg != null) {
+        ackQueue.add(msg);
+      }
+    }
+    return new SolaceCheckpointMark(active, ackQueue);
   }
 
   @Override
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
index 7631d32f63c..38b4953a598 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java
@@ -40,6 +40,11 @@ public abstract class MockEmptySessionService extends 
SessionService {
     throw new UnsupportedOperationException(exceptionMessage);
   }
 
+  @Override
+  public boolean isClosed() {
+    throw new UnsupportedOperationException(exceptionMessage);
+  }
+
   @Override
   public MessageReceiver getReceiver() {
     throw new UnsupportedOperationException(exceptionMessage);
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
index 6d28bcefc84..bd52dee7ea8 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java
@@ -77,6 +77,11 @@ public abstract class MockSessionService extends 
SessionService {
   @Override
   public void close() {}
 
+  @Override
+  public boolean isClosed() {
+    return false;
+  }
+
   @Override
   public MessageReceiver getReceiver() {
     if (messageReceiver == null) {
@@ -126,6 +131,11 @@ public abstract class MockSessionService extends 
SessionService {
     @Override
     public void start() {}
 
+    @Override
+    public boolean isClosed() {
+      return false;
+    }
+
     @Override
     public BytesXMLMessage receive() throws IOException {
       return getRecordFn.apply(counter.getAndIncrement());
diff --git 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index a1f80932edd..c718c55e1b4 100644
--- 
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++ 
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -447,29 +447,25 @@ public class SolaceIOReadTest {
     // start the reader and move to the first record
     assertTrue(reader.start());
 
-    // consume 3 messages (NB: #start() already consumed the first message)
+    // consume 3 messages (NB: start already consumed the first message)
     for (int i = 0; i < 3; i++) {
       assertTrue(String.format("Failed at %d-th message", i), 
reader.advance());
     }
 
-    // #advance() was called, but the messages were not ready to be 
acknowledged.
-    assertEquals(0, countAckMessages.get());
-
-    // mark all consumed messages as ready to be acknowledged
+    // create checkpoint but don't finalize yet
     CheckpointMark checkpointMark = reader.getCheckpointMark();
 
-    // consume 1 more message. This will call #ackMsg() on messages that were 
ready to be acked.
+    // consume 2 more messages
     reader.advance();
-    assertEquals(4, countAckMessages.get());
-
-    // consume 1 more message. No change in the acknowledged messages.
     reader.advance();
-    assertEquals(4, countAckMessages.get());
+
+    // check if messages are still not acknowledged
+    assertEquals(0, countAckMessages.get());
 
     // acknowledge from the first checkpoint
     checkpointMark.finalizeCheckpoint();
-    // No change in the acknowledged messages, because they were acknowledged 
in the #advance()
-    // method.
+
+    // only messages from the first checkpoint are acknowledged
     assertEquals(4, countAckMessages.get());
   }
 

Reply via email to