This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e62187ebbf SolaceIO data loss - remove message ack from close and
advance as it may lead to data loss during work rebalancing or retry. (#37007)
1e62187ebbf is described below
commit 1e62187ebbf115769219ca829e211669e73cf75e
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Mon Dec 8 18:29:44 2025 +0100
SolaceIO data loss - remove message ack from close and advance as it may
lead to data loss during work rebalancing or retry. (#37007)
---
.../sdk/io/solace/read/SolaceCheckpointMark.java | 9 ++-
.../sdk/io/solace/read/UnboundedSolaceReader.java | 33 ++--------
.../beam/sdk/io/solace/SolaceIOReadTest.java | 73 +++++++++++++++++++++-
3 files changed, 78 insertions(+), 37 deletions(-)
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index a913fd6133e..eb2d4b3006a 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -18,8 +18,8 @@
package org.apache.beam.sdk.io.solace.read;
import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.util.List;
import java.util.Objects;
-import java.util.Queue;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private static final Logger LOG =
LoggerFactory.getLogger(SolaceCheckpointMark.class);
- private transient Queue<BytesXMLMessage> safeToAck;
+ private transient List<BytesXMLMessage> safeToAck;
@SuppressWarnings("initialization") // Avro will set the fields by breaking
abstraction
private SolaceCheckpointMark() {}
@@ -48,14 +48,13 @@ public class SolaceCheckpointMark implements
UnboundedSource.CheckpointMark {
*
* @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged.
*/
- SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) {
+ SolaceCheckpointMark(List<BytesXMLMessage> safeToAck) {
this.safeToAck = safeToAck;
}
@Override
public void finalizeCheckpoint() {
- BytesXMLMessage msg;
- while ((msg = safeToAck.poll()) != null) {
+ for (BytesXMLMessage msg : safeToAck) {
try {
msg.ackMessage();
} catch (IllegalStateException e) {
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
index dc84e0a0701..7c756169ef3 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -27,7 +27,6 @@ import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +41,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -60,12 +60,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private @Nullable BytesXMLMessage solaceOriginalRecord;
private @Nullable T solaceMappedRecord;
- /**
- * Queue to place advanced messages before {@link #getCheckpointMark()} is
called. CAUTION:
- * Accessed by both reader and checkpointing threads.
- */
- private final Queue<BytesXMLMessage> safeToAckMessages = new
ConcurrentLinkedQueue<>();
-
/**
* Queue for messages that were ingested in the {@link #advance()} method,
but not sent yet to a
* {@link SolaceCheckpointMark}.
@@ -136,8 +130,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public boolean advance() {
- finalizeReadyMessages();
-
BytesXMLMessage receivedXmlMessage;
try {
receivedXmlMessage = getSessionService().getReceiver().receive();
@@ -158,27 +150,9 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public void close() {
- finalizeReadyMessages();
sessionServiceCache.invalidate(readerUuid);
}
- public void finalizeReadyMessages() {
- BytesXMLMessage msg;
- while ((msg = safeToAckMessages.poll()) != null) {
- try {
- msg.ackMessage();
- } catch (IllegalStateException e) {
- LOG.error(
- "SolaceIO.Read: failed to acknowledge the message with
applicationMessageId={}, ackMessageId={}. Returning the message to queue to
retry.",
- msg.getApplicationMessageId(),
- msg.getAckMessageId(),
- e);
- safeToAckMessages.add(msg); // In case the error was transient, might
succeed later
- break; // Commit is only best effort
- }
- }
- }
-
@Override
public Instant getWatermark() {
// should be only used by a test receiver
@@ -190,9 +164,10 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
- safeToAckMessages.addAll(receivedMessages);
+
+ ImmutableList<BytesXMLMessage> bytesXMLMessages =
ImmutableList.copyOf(receivedMessages);
receivedMessages.clear();
- return new SolaceCheckpointMark(safeToAckMessages);
+ return new SolaceCheckpointMark(bytesXMLMessages);
}
@Override
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
index a1f80932edd..c17ec3e128d 100644
---
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java
@@ -458,13 +458,13 @@ public class SolaceIOReadTest {
// mark all consumed messages as ready to be acknowledged
CheckpointMark checkpointMark = reader.getCheckpointMark();
- // consume 1 more message. This will call #ackMsg() on messages that were
ready to be acked.
+ // consume 1 more message.
reader.advance();
- assertEquals(4, countAckMessages.get());
+ assertEquals(0, countAckMessages.get());
// consume 1 more message. No change in the acknowledged messages.
reader.advance();
- assertEquals(4, countAckMessages.get());
+ assertEquals(0, countAckMessages.get());
// acknowledge from the first checkpoint
checkpointMark.finalizeCheckpoint();
@@ -473,6 +473,73 @@ public class SolaceIOReadTest {
assertEquals(4, countAckMessages.get());
}
+ @Test
+ public void testLateCheckpointOverlappingFlushingOfNextBundle() throws
Exception {
+ AtomicInteger countConsumedMessages = new AtomicInteger(0);
+ AtomicInteger countAckMessages = new AtomicInteger(0);
+
+ // Broker that creates input data
+ SerializableFunction<Integer, BytesXMLMessage> recordFn =
+ index -> {
+ List<BytesXMLMessage> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ messages.add(
+ SolaceDataUtils.getBytesXmlMessage(
+ "payload_test" + i, "45" + i, (num) ->
countAckMessages.incrementAndGet()));
+ }
+ countConsumedMessages.incrementAndGet();
+ return getOrNull(index, messages);
+ };
+
+ SessionServiceFactory fakeSessionServiceFactory =
+
MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build();
+
+ Read<Record> spec =
+ getDefaultRead()
+ .withSessionServiceFactory(fakeSessionServiceFactory)
+ .withMaxNumConnections(4);
+
+ UnboundedSolaceSource<Record> initialSource = getSource(spec, pipeline);
+
+ UnboundedReader<Record> reader =
+ initialSource.createReader(PipelineOptionsFactory.create(), null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: #start() already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(String.format("Failed at %d-th message", i),
reader.advance());
+ }
+
+ // #advance() was called, but the messages were not ready to be
acknowledged.
+ assertEquals(0, countAckMessages.get());
+
+ // mark all consumed messages as ready to be acknowledged
+ CheckpointMark checkpointMark = reader.getCheckpointMark();
+
+ // data is flushed
+
+ // consume 1 more message.
+ reader.advance();
+ assertEquals(0, countAckMessages.get());
+
+ // consume 1 more message. No change in the acknowledged messages.
+ reader.advance();
+ assertEquals(0, countAckMessages.get());
+
+ CheckpointMark checkpointMark2 = reader.getCheckpointMark();
+ // data is prepared for flushing that will be rejected
+
+ // acknowledge from the first checkpoint may arrive late
+ checkpointMark.finalizeCheckpoint();
+
+ assertEquals(4, countAckMessages.get());
+
+ checkpointMark2.finalizeCheckpoint();
+ assertEquals(6, countAckMessages.get());
+ }
+
@Test
public void testCheckpointMarkSafety() throws Exception {