This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f7a5bbd4a35 Revert "SolaceIO data loss - remove message ack from close
and advance as it may lead to data loss during work rebalancing or retry. "
(#37162)
f7a5bbd4a35 is described below
commit f7a5bbd4a35d9b6e128817755f774ba414ea5716
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Mon Jan 5 11:40:30 2026 +0100
Revert "SolaceIO data loss - remove message ack from close and advance as
it may lead to data loss during work rebalancing or retry. " (#37162)
This reverts commit f277b5ec59ba213af547167080fd8f5bd210e6a7.
---
.../sdk/io/solace/read/SolaceCheckpointMark.java | 9 +--
.../sdk/io/solace/read/UnboundedSolaceReader.java | 33 ++++++++--
.../beam/sdk/io/solace/SolaceIOReadTest.java | 73 +---------------------
3 files changed, 37 insertions(+), 78 deletions(-)
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index eb2d4b3006a..a913fd6133e 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -18,8 +18,8 @@
package org.apache.beam.sdk.io.solace.read;
import com.solacesystems.jcsmp.BytesXMLMessage;
-import java.util.List;
import java.util.Objects;
+import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG =
LoggerFactory.getLogger(SolaceCheckpointMark.class);
- private transient List<BytesXMLMessage> safeToAck;
+ private transient Queue<BytesXMLMessage> safeToAck;
@SuppressWarnings("initialization") // Avro will set the fields by breaking
abstraction
private SolaceCheckpointMark() {}
@@ -48,13 +48,14 @@ public class SolaceCheckpointMark implements
UnboundedSource.CheckpointMark {
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
- SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
+ SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}
@Override
public void finalizeCheckpoint() {
- for (BytesXMLMessage msg : safeToAck) {
+ BytesXMLMessage msg;
+ while ((msg = safeToAck.poll()) != null) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index 7c756169ef3..dc84e0a0701 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -27,6 +27,7 @@ import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,7 +42,6 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -60,6 +60,12 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
+ /**
+ * Queue to place advanced messages before {@link #getCheckpointMark()} is
called. CAUTION:
+ * Accessed by both reader and checkpointing threads.
+ */
+ private final Queue<BytesXMLMessage> safeToAckMessages = new
ConcurrentLinkedQueue<>();
+
/**
* Queue for messages that were ingested in the {@link #advance()} method,
but not sent yet to a
* {@link SolaceCheckpointMark}.
@@ -130,6 +136,8 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public boolean advance() {
+ finalizeReadyMessages();
+
BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -150,9 +158,27 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public void close() {
+ finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
}
+ public void finalizeReadyMessages() {
+ BytesXMLMessage msg;
+ while ((msg = safeToAckMessages.poll()) != null) {
+ try {
+ msg.ackMessage();
+ } catch (IllegalStateException e) {
+ LOG.error(
+ "SolaceIO.Read: failed to acknowledge the message with
applicationMessageId={}, ackMessageId={}. Returning the message to queue to
retry.",
+ msg.getApplicationMessageId(),
+ msg.getAckMessageId(),
+ e);
+ safeToAckMessages.add(msg); // In case the error was transient, might
succeed later
+ break; // Commit is only best effort
+ }
+ }
+ }
+
@Override
public Instant getWatermark() {
// should be only used by a test receiver
@@ -164,10 +190,9 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
-
- ImmutableList<BytesXMLMessage> bytesXMLMessages =
ImmutableList.copyOf(receivedMessages);
+ safeToAckMessages.addAll(receivedMessages);
receivedMessages.clear();
- return new SolaceCheckpointMark(bytesXMLMessages);
+ return new SolaceCheckpointMark(safeToAckMessages);
}
@Override
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index c17ec3e128d..a1f80932edd 100644
---
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -458,13 +458,13 @@ public class SolaceIOReadTest {
// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();
- // consume 1 more message.
+ // consume 1 more message. This will call #ackMsg() on messages that were
ready to be acked.
reader.advance();
- assertEquals(0, countAckMessages.get());
+ assertEquals(4, countAckMessages.get());
// consume 1 more message. No change in the acknowledged messages.
reader.advance();
- assertEquals(0, countAckMessages.get());
+ assertEquals(4, countAckMessages.get());
// acknowledge from the first checkpoint
checkpointMark.finalizeCheckpoint();
@@ -473,73 +473,6 @@ public class SolaceIOReadTest {
assertEquals(4, countAckMessages.get());
}
- @Test
- public void testLateCheckpointOverlappingFlushingOfNextBundle() throws
Exception {
- AtomicInteger countConsumedMessages = new AtomicInteger(0);
- AtomicInteger countAckMessages = new AtomicInteger(0);
-
- // Broker that creates input data
- SerializableFunction<Integer, BytesXMLMessage> recordFn =
- index -> {
- List<BytesXMLMessage> messages = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- messages.add(
- SolaceDataUtils.getBytesXmlMessage(
- "payload_test" + i, "45" + i, (num) ->
countAckMessages.incrementAndGet()));
- }
- countConsumedMessages.incrementAndGet();
- return getOrNull(index, messages);
- };
-
- SessionServiceFactory fakeSessionServiceFactory =
-
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
-
- Read<Record> spec =
- getDefaultRead()
- .withSessionServiceFactory(fakeSessionServiceFactory)
- .withMaxNumConnections(4);
-
- UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
-
- UnboundedReader<Record> reader =
- initialSource.createReader(PipelineOptionsFactory.create(), null);
-
- // start the reader and move to the first record
- assertTrue(reader.start());
-
- // consume 3 messages (NB: #start() already consumed the first message)
- for (int i = 0; i < 3; i++) {
- assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
- }
-
- // #advance() was called, but the messages were not ready to be
acknowledged.
- assertEquals(0, countAckMessages.get());
-
- // mark all consumed messages as ready to be acknowledged
- CheckpointMark checkpointMark = reader.getCheckpointMark();
-
- // data is flushed
-
- // consume 1 more message.
- reader.advance();
- assertEquals(0, countAckMessages.get());
-
- // consume 1 more message. No change in the acknowledged messages.
- reader.advance();
- assertEquals(0, countAckMessages.get());
-
- CheckpointMark checkpointMark2 = reader.getCheckpointMark();
- // data is prepared for flushing that will be rejected
-
- // acknowledge from the first checkpoint may arrive late
- checkpointMark.finalizeCheckpoint();
-
- assertEquals(4, countAckMessages.get());
-
- checkpointMark2.finalizeCheckpoint();
- assertEquals(6, countAckMessages.get());
- }
-
@Test
public void testCheckpointMarkSafety() throws Exception {