This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4356253a5e8 Revert "SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed (#32962)" (#33259) 4356253a5e8 is described below commit 4356253a5e8124bf39152a9ead9ad26ef7267750 Author: Bartosz Zablocki <bzablo...@google.com> AuthorDate: Mon Dec 2 18:18:42 2024 +0000 Revert "SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed (#32962)" (#33259) This reverts commit e279e55f47683bcee1c59c0b7aea2b21741fadb3. --- .../broker/BasicAuthJcsmpSessionService.java | 18 ++- .../beam/sdk/io/solace/broker/MessageReceiver.java | 7 ++ .../beam/sdk/io/solace/broker/SessionService.java | 7 ++ .../io/solace/broker/SolaceMessageReceiver.java | 17 ++- .../sdk/io/solace/read/SolaceCheckpointMark.java | 46 ++++--- .../sdk/io/solace/read/UnboundedSolaceReader.java | 135 ++++++--------------- .../sdk/io/solace/MockEmptySessionService.java | 5 + .../beam/sdk/io/solace/MockSessionService.java | 10 ++ .../beam/sdk/io/solace/SolaceIOReadTest.java | 20 ++- 9 files changed, 127 insertions(+), 138 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index d4c9a3ec621..b2196dbf106 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -102,7 +102,10 @@ public abstract class BasicAuthJcsmpSessionService extends SessionService { if (messageReceiver != null) { messageReceiver.close(); } - if (jcsmpSession != null) { + if (messageProducer != null) { + messageProducer.close(); + } + if (!isClosed()) { checkStateNotNull(jcsmpSession).closeSession(); } return 0; @@ -116,9 +119,8 @@ public abstract class BasicAuthJcsmpSessionService extends SessionService { this.messageReceiver = retryCallableManager.retryCallable( this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); - this.messageReceiver.start(); } - return checkStateNotNull(this.messageReceiver); + return this.messageReceiver; } @Override @@ -136,10 +138,15 @@ public abstract class BasicAuthJcsmpSessionService extends SessionService { return publishedResultsQueue; } + @Override + public boolean isClosed() { + return jcsmpSession == null || jcsmpSession.isClosed(); + } + private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode) throws JCSMPException, IOException { - if (jcsmpSession == null) { + if (isClosed()) { connectWriteSession(submissionMode); } @@ -158,6 +165,9 @@ public abstract class BasicAuthJcsmpSessionService extends SessionService { } private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { + if (isClosed()) { + connectSession(); + } Queue queue = JCSMPFactory.onlyInstance() diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java index 017a6326067..95f989bd1be 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -35,6 +35,13 @@ public interface MessageReceiver { */ void start(); + /** + * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. + * + * <p>A message receiver is closed when it is no longer able to receive messages. + */ + boolean isClosed(); + /** * Receives a message from the broker. * diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index 6dcd0b65261..84a876a9d0b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -120,6 +120,13 @@ public abstract class SessionService implements Serializable { /** Gracefully closes the connection to the service. */ public abstract void close(); + /** + * Checks whether the connection to the service is currently closed. This method is called when an + * `UnboundedSolaceReader` is starting to read messages - a session will be created if this + * returns true. + */ + public abstract boolean isClosed(); + /** * Returns a MessageReceiver object for receiving messages from Solace. If it is the first time * this method is used, the receiver is created from the session instance, otherwise it returns diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java index d74f3cae89f..d548d2049a5 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java @@ -24,8 +24,12 @@ import com.solacesystems.jcsmp.StaleSessionException; import java.io.IOException; import org.apache.beam.sdk.io.solace.RetryCallableManager; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SolaceMessageReceiver implements MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class); + public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100; private final FlowReceiver flowReceiver; private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); @@ -48,14 +52,19 @@ public class SolaceMessageReceiver implements MessageReceiver { ImmutableSet.of(JCSMPException.class)); } + @Override + public boolean isClosed() { + return flowReceiver == null || flowReceiver.isClosed(); + } + @Override public BytesXMLMessage receive() throws IOException { try { return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS); } catch (StaleSessionException e) { + LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver."); startFlowReceiver(); - throw new IOException( - "SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.", e); + throw new IOException(e); } catch (JCSMPException e) { throw new IOException(e); } @@ -63,6 +72,8 @@ public class SolaceMessageReceiver implements MessageReceiver { @Override public void close() { - flowReceiver.close(); + if (!isClosed()) { + this.flowReceiver.close(); + } } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index a913fd6133e..77f6eed8f62 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -18,16 +18,17 @@ package org.apache.beam.sdk.io.solace.read; import com.solacesystems.jcsmp.BytesXMLMessage; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; -import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be @@ -37,8 +38,10 @@ import org.slf4j.LoggerFactory; @Internal @VisibleForTesting public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { - private static final Logger LOG = LoggerFactory.getLogger(SolaceCheckpointMark.class); - private transient Queue<BytesXMLMessage> safeToAck; + private transient AtomicBoolean activeReader; + // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry + // these messages here. We relay on Solace's retry mechanism. + private transient ArrayDeque<BytesXMLMessage> ackQueue; @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} @@ -46,24 +49,25 @@ public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { /** * Creates a new {@link SolaceCheckpointMark}. * - * @param safeToAck - a queue of {@link BytesXMLMessage} to be acknowledged. + * @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The + * reader creating the messages has to be active to acknowledge the messages. + * @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged. */ - SolaceCheckpointMark(Queue<BytesXMLMessage> safeToAck) { - this.safeToAck = safeToAck; + SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> ackQueue) { + this.activeReader = activeReader; + this.ackQueue = new ArrayDeque<>(ackQueue); } @Override public void finalizeCheckpoint() { - BytesXMLMessage msg; - while ((msg = safeToAck.poll()) != null) { - try { + if (activeReader == null || !activeReader.get() || ackQueue == null) { + return; + } + + while (!ackQueue.isEmpty()) { + BytesXMLMessage msg = ackQueue.poll(); + if (msg != null) { msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: cannot acknowledge the message with applicationMessageId={}, ackMessageId={}. It will not be retried.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); } } } @@ -80,11 +84,15 @@ public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { return false; } SolaceCheckpointMark that = (SolaceCheckpointMark) o; - return Objects.equals(safeToAck, that.safeToAck); + // Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not + // content. + ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue); + ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue); + return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList); } @Override public int hashCode() { - return Objects.hash(safeToAck); + return Objects.hash(activeReader, ackQueue); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index dc84e0a0701..a421970370d 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -22,26 +22,17 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.SempClient; import org.apache.beam.sdk.io.solace.broker.SessionService; -import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -55,92 +46,48 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { private final UnboundedSolaceSource<T> currentSource; private final WatermarkPolicy<T> watermarkPolicy; private final SempClient sempClient; - private final UUID readerUuid; - private final SessionServiceFactory sessionServiceFactory; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; + private @Nullable SessionService sessionService; + AtomicBoolean active = new AtomicBoolean(true); /** - * Queue to place advanced messages before {@link #getCheckpointMark()} is called. CAUTION: - * Accessed by both reader and checkpointing threads. + * Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent + * queue, should only be accessed by the reader thread A given {@link UnboundedReader} object will + * only be accessed by a single thread at once. */ - private final Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>(); - - /** - * Queue for messages that were ingested in the {@link #advance()} method, but not sent yet to a - * {@link SolaceCheckpointMark}. - */ - private final Queue<BytesXMLMessage> receivedMessages = new ArrayDeque<>(); - - private static final Cache<UUID, SessionService> sessionServiceCache; - private static final ScheduledExecutorService cleanUpThread = Executors.newScheduledThreadPool(1); - - static { - Duration cacheExpirationTimeout = Duration.ofMinutes(1); - sessionServiceCache = - CacheBuilder.newBuilder() - .expireAfterAccess(cacheExpirationTimeout) - .removalListener( - (RemovalNotification<UUID, SessionService> notification) -> { - LOG.info( - "SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.", - notification.getKey(), - cacheExpirationTimeout); - SessionService sessionService = notification.getValue(); - if (sessionService != null) { - sessionService.close(); - } - }) - .build(); - - startCleanUpThread(); - } - - @SuppressWarnings("FutureReturnValueIgnored") - private static void startCleanUpThread() { - cleanUpThread.scheduleAtFixedRate(sessionServiceCache::cleanUp, 1, 1, TimeUnit.MINUTES); - } + private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new ArrayDeque<>(); public UnboundedSolaceReader(UnboundedSolaceSource<T> currentSource) { this.currentSource = currentSource; this.watermarkPolicy = WatermarkPolicy.create( currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); - this.sessionServiceFactory = currentSource.getSessionServiceFactory(); + this.sessionService = currentSource.getSessionServiceFactory().create(); this.sempClient = currentSource.getSempClientFactory().create(); - this.readerUuid = UUID.randomUUID(); - } - - private SessionService getSessionService() { - try { - return sessionServiceCache.get( - readerUuid, - () -> { - LOG.info("SolaceIO.Read: creating a new session for reader with uuid {}.", readerUuid); - SessionService sessionService = sessionServiceFactory.create(); - sessionService.connect(); - sessionService.getReceiver().start(); - return sessionService; - }); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } } @Override public boolean start() { - // Create and initialize SessionService with Receiver - getSessionService(); + populateSession(); + checkNotNull(sessionService).getReceiver().start(); return advance(); } + public void populateSession() { + if (sessionService == null) { + sessionService = getCurrentSource().getSessionServiceFactory().create(); + } + if (sessionService.isClosed()) { + checkNotNull(sessionService).connect(); + } + } + @Override public boolean advance() { - finalizeReadyMessages(); - BytesXMLMessage receivedXmlMessage; try { - receivedXmlMessage = getSessionService().getReceiver().receive(); + receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive(); } catch (IOException e) { LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); return false; @@ -149,40 +96,23 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { if (receivedXmlMessage == null) { return false; } + elementsToCheckpoint.add(receivedXmlMessage); solaceOriginalRecord = receivedXmlMessage; solaceMappedRecord = getCurrentSource().getParseFn().apply(receivedXmlMessage); - receivedMessages.add(receivedXmlMessage); - + watermarkPolicy.update(solaceMappedRecord); return true; } @Override public void close() { - finalizeReadyMessages(); - sessionServiceCache.invalidate(readerUuid); - } - - public void finalizeReadyMessages() { - BytesXMLMessage msg; - while ((msg = safeToAckMessages.poll()) != null) { - try { - msg.ackMessage(); - } catch (IllegalStateException e) { - LOG.error( - "SolaceIO.Read: failed to acknowledge the message with applicationMessageId={}, ackMessageId={}. Returning the message to queue to retry.", - msg.getApplicationMessageId(), - msg.getAckMessageId(), - e); - safeToAckMessages.add(msg); // In case the error was transient, might succeed later - break; // Commit is only best effort - } - } + active.set(false); + checkNotNull(sessionService).close(); } @Override public Instant getWatermark() { // should be only used by a test receiver - if (getSessionService().getReceiver().isEOF()) { + if (checkNotNull(sessionService).getReceiver().isEOF()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } return watermarkPolicy.getWatermark(); @@ -190,9 +120,14 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> { @Override public UnboundedSource.CheckpointMark getCheckpointMark() { - safeToAckMessages.addAll(receivedMessages); - receivedMessages.clear(); - return new SolaceCheckpointMark(safeToAckMessages); + List<BytesXMLMessage> ackQueue = new ArrayList<>(); + while (!elementsToCheckpoint.isEmpty()) { + BytesXMLMessage msg = elementsToCheckpoint.poll(); + if (msg != null) { + ackQueue.add(msg); + } + } + return new SolaceCheckpointMark(active, ackQueue); } @Override diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index 7631d32f63c..38b4953a598 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -40,6 +40,11 @@ public abstract class MockEmptySessionService extends SessionService { throw new UnsupportedOperationException(exceptionMessage); } + @Override + public boolean isClosed() { + throw new UnsupportedOperationException(exceptionMessage); + } + @Override public MessageReceiver getReceiver() { throw new UnsupportedOperationException(exceptionMessage); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index 6d28bcefc84..bd52dee7ea8 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -77,6 +77,11 @@ public abstract class MockSessionService extends SessionService { @Override public void close() {} + @Override + public boolean isClosed() { + return false; + } + @Override public MessageReceiver getReceiver() { if (messageReceiver == null) { @@ -126,6 +131,11 @@ public abstract class MockSessionService extends SessionService { @Override public void start() {} + @Override + public boolean isClosed() { + return false; + } + @Override public BytesXMLMessage receive() throws IOException { return getRecordFn.apply(counter.getAndIncrement()); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index a1f80932edd..c718c55e1b4 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -447,29 +447,25 @@ public class SolaceIOReadTest { // start the reader and move to the first record assertTrue(reader.start()); - // consume 3 messages (NB: #start() already consumed the first message) + // consume 3 messages (NB: start already consumed the first message) for (int i = 0; i < 3; i++) { assertTrue(String.format("Failed at %d-th message", i), reader.advance()); } - // #advance() was called, but the messages were not ready to be acknowledged. - assertEquals(0, countAckMessages.get()); - - // mark all consumed messages as ready to be acknowledged + // create checkpoint but don't finalize yet CheckpointMark checkpointMark = reader.getCheckpointMark(); - // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked. + // consume 2 more messages reader.advance(); - assertEquals(4, countAckMessages.get()); - - // consume 1 more message. No change in the acknowledged messages. reader.advance(); - assertEquals(4, countAckMessages.get()); + + // check if messages are still not acknowledged + assertEquals(0, countAckMessages.get()); // acknowledge from the first checkpoint checkpointMark.finalizeCheckpoint(); - // No change in the acknowledged messages, because they were acknowledged in the #advance() - // method. + + // only messages from the first checkpoint are acknowledged assertEquals(4, countAckMessages.get()); }