This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8f199ec0248 [feat] PIP-468: DagWatchClient auto-reconnects on broker
disconnect (#25687)
8f199ec0248 is described below
commit 8f199ec024840dd65538cf7282f47ab0dcde1257
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 07:28:14 2026 -0700
[feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect (#25687)
---
.../client/api/v5/V5DagWatchAutoReconnectTest.java | 207 +++++++++++++++++++++
.../pulsar/client/impl/v5/DagWatchClient.java | 146 ++++++++++++---
2 files changed, 322 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
new file mode 100644
index 00000000000..c5e294c35ce
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code DagWatchClient}'s auto-reconnect path. The DAG watch
holds
+ * the producer / consumer's view of the scalable topic layout — without
+ * reconnect, a transient broker disconnect silently strands the client on
stale
+ * layout and never delivers another split / merge update.
+ *
+ * <p>These tests force-close the watch's underlying channel and assert that
+ * producers and consumers continue to operate end-to-end without application
+ * intervention.
+ */
+public class V5DagWatchAutoReconnectTest extends V5ClientBaseTest {
+
+ /**
+ * Producer's DAG watch channel is force-closed mid-life. Sends made after
the
+ * close must still succeed: the cached layout keeps existing segments
+ * reachable, and the reconnect re-establishes the watch so subsequent
layout
+ * changes would still be observed.
+ */
+ @Test
+ public void testProducerSurvivesDagWatchConnectionDrop() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-reconnect-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int firstN = 10;
+ Set<String> firstSent = new HashSet<>();
+ for (int i = 0; i < firstN; i++) {
+ String v = "first-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ firstSent.add(v);
+ }
+ assertEquals(drain(consumer, firstN), firstSent,
+ "first batch must arrive before the disconnect");
+
+ // Force-close the DAG watch channel. The cnx layer fires
connectionClosed()
+ // on the DagWatchClient, which schedules a reconnect.
+ forceCloseDagWatchOnProducer(producer);
+
+ // Send a second batch immediately. Existing segments are still
reachable
+ // through the per-segment v4 producers (their own connections are
unaffected),
+ // so this proves the producer keeps working through the reconnect
window.
+ int secondN = 10;
+ Set<String> secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+ assertEquals(drain(consumer, secondN), secondSent,
+ "producer must keep sending after DAG watch reconnect kicks
in");
+ }
+
+ /**
+ * After a force-close, the DAG watch must observe a fresh broker
connection
+ * (i.e., its internal {@code cnx} field is re-populated). Asserts the
+ * reconnect path actually fires rather than silently staying disconnected.
+ */
+ @Test
+ public void testDagWatchReattachesAfterDisconnect() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Object dagWatch = getDagWatchOnProducer(producer);
+ Object originalCnx = readField(dagWatch, "cnx");
+ assertNotNull(originalCnx, "DAG watch must have an initial
connection");
+
+ forceCloseDagWatchOnProducer(producer);
+
+ // Wait for the reconnect path to land a fresh ClientCnx on the
DagWatchClient.
+ // Backoff starts at 100ms; allow a generous window for CI.
+ Awaitility.await().atMost(Duration.ofSeconds(15))
+ .until(() -> {
+ Object current = readField(dagWatch, "cnx");
+ return current != null && current != originalCnx;
+ });
+ }
+
+ /**
+ * A consumer's DAG watch channel is force-closed mid-life. Like the
producer
+ * test, this asserts the consumer continues to deliver messages produced
+ * after the disconnect.
+ */
+ @Test
+ public void testConsumerSurvivesDagWatchConnectionDrop() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-reconnect-consumer-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int firstN = 10;
+ for (int i = 0; i < firstN; i++) {
+ producer.newMessage().key("k-" + i).value("first-" + i).send();
+ }
+ assertEquals(drain(consumer, firstN).size(), firstN,
+ "first batch must arrive before disconnect");
+
+ forceCloseDagWatchOnConsumer(consumer);
+
+ int secondN = 10;
+ for (int i = 0; i < secondN; i++) {
+ producer.newMessage().key("k-" + i).value("second-" + i).send();
+ }
+ Set<String> got = drain(consumer, secondN);
+ assertEquals(got.size(), secondN,
+ "consumer must keep receiving after DAG watch reconnect kicks
in");
+ }
+
+ // --- Helpers ---
+
+ private Set<String> drain(QueueConsumer<String> consumer, int expected)
throws Exception {
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ }
+ return received;
+ }
+
+ private static Object getDagWatchOnProducer(Producer<?> producer) throws
Exception {
+ Field f = producer.getClass().getDeclaredField("dagWatch");
+ f.setAccessible(true);
+ Object watch = f.get(producer);
+ assertNotNull(watch, "expected dagWatch on producer");
+ return watch;
+ }
+
+ private static void forceCloseDagWatchOnProducer(Producer<?> producer)
throws Exception {
+ Object watch = getDagWatchOnProducer(producer);
+ Method m =
watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+ m.setAccessible(true);
+ m.invoke(watch);
+ }
+
+ private static void forceCloseDagWatchOnConsumer(QueueConsumer<?>
consumer) throws Exception {
+ Field f = consumer.getClass().getDeclaredField("dagWatch");
+ f.setAccessible(true);
+ Object watch = f.get(consumer);
+ assertNotNull(watch, "expected dagWatch on consumer");
+ Method m =
watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+ m.setAccessible(true);
+ m.invoke(watch);
+ }
+
+ private static Object readField(Object target, String name) throws
Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(target);
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index b13828e7561..2041dab9b57 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl.v5;
import io.github.merlimat.slog.Logger;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,6 +32,7 @@ import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
/**
* Client-side manager for a DAG watch session on a scalable topic.
@@ -39,6 +42,12 @@ import org.apache.pulsar.common.protocol.Commands;
*
* <p>Maintains the current {@link ClientSegmentLayout} and notifies a listener
* when it changes.
+ *
+ * <p>Reconnects automatically on transient broker disconnects. The
initial-create
+ * future ({@link #start}) surfaces failures up front so a producer / consumer
+ * {@code create()} fails fast when the broker is unreachable. After the first
+ * layout has arrived, subsequent disconnects schedule a reconnection with
+ * exponential backoff so long-lived producers / consumers survive network
blips.
*/
final class DagWatchClient implements DagWatchSession, AutoCloseable {
@@ -52,6 +61,7 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
private final long sessionId;
private final AtomicReference<ClientSegmentLayout> currentLayout = new
AtomicReference<>();
private final CompletableFuture<ClientSegmentLayout> initialLayoutFuture =
new CompletableFuture<>();
+ private final Backoff reconnectBackoff;
private volatile LayoutChangeListener listener;
private volatile ClientCnx cnx;
private volatile boolean closed = false;
@@ -60,6 +70,10 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
this.v4Client = v4Client;
this.topicName = topicName;
this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
+ this.reconnectBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(30))
+ .build();
this.log = LOG.with().attr("topic", topicName).attr("sessionId",
sessionId).build();
}
@@ -70,39 +84,55 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
* @return a future that completes with the initial layout
*/
CompletableFuture<ClientSegmentLayout> start() {
- // Get any broker connection and send the lookup command
v4Client.getConnection(topicName.toString())
- .thenAccept(cnx -> {
- this.cnx = cnx;
- if (!cnx.isSupportsScalableTopics()) {
- initialLayoutFuture.completeExceptionally(
- new
PulsarClientException.FeatureNotSupportedException(
- "Broker does not support scalable
topics",
-
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
- return;
- }
- // Register this session to receive updates
- cnx.registerDagWatchSession(sessionId, this);
-
- // Send the lookup command
- cnx.ctx().writeAndFlush(
- Commands.newScalableTopicLookup(sessionId,
topicName.toString()))
- .addListener(writeFuture -> {
- if (!writeFuture.isSuccess()) {
- cnx.removeDagWatchSession(sessionId);
- initialLayoutFuture.completeExceptionally(
- new
PulsarClientException(writeFuture.cause()));
- }
- });
- })
+ .thenAccept(this::attach)
.exceptionally(ex -> {
initialLayoutFuture.completeExceptionally(ex);
return null;
});
-
return initialLayoutFuture;
}
+ /**
+ * Wire {@code newCnx} to this session and send a ScalableTopicLookup.
Used by
+ * both {@link #start} (first connect) and {@link #reconnect} (after
disconnect).
+ */
+ private void attach(ClientCnx newCnx) {
+ if (closed) {
+ return;
+ }
+ if (!newCnx.isSupportsScalableTopics()) {
+ PulsarClientException ex = new
PulsarClientException.FeatureNotSupportedException(
+ "Broker does not support scalable topics",
+
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics);
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(ex);
+ } else {
+ log.warn().exceptionMessage(ex)
+ .log("Reconnect target broker doesn't support scalable
topics");
+ scheduleReconnect();
+ }
+ return;
+ }
+ this.cnx = newCnx;
+ newCnx.registerDagWatchSession(sessionId, this);
+ newCnx.ctx().writeAndFlush(
+ Commands.newScalableTopicLookup(sessionId,
topicName.toString()))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ newCnx.removeDagWatchSession(sessionId);
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(
+ new
PulsarClientException(writeFuture.cause()));
+ } else {
+ log.warn().exceptionMessage(writeFuture.cause())
+ .log("DAG watch reconnect write failed;
will retry");
+ scheduleReconnect();
+ }
+ }
+ });
+ }
+
/**
* Called when the broker pushes a ScalableTopicUpdate for this session.
* This is invoked from the Netty I/O thread.
@@ -121,6 +151,10 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
.attr("activeSegmentCount", newLayout.activeSegments().size())
.log("Layout updated");
+ // Reset the reconnect backoff: the broker confirmed the session is
live and
+ // our local state is consistent.
+ reconnectBackoff.reset();
+
// Complete the initial layout future if this is the first update
initialLayoutFuture.complete(newLayout);
@@ -138,18 +172,56 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
public void onError(ServerError error, String message) {
log.error().attr("error", error).attr("message", message)
.log("DAG watch session error");
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException(
- "Scalable topic lookup failed: " + error + " - " +
message));
+ if (!initialLayoutFuture.isDone()) {
+ initialLayoutFuture.completeExceptionally(
+ new PulsarClientException(
+ "Scalable topic lookup failed: " + error + " - " +
message));
+ }
+ // After the initial layout has arrived, broker-side errors on this
session
+ // (e.g., metadata unavailable) are transient — a reconnect typically
clears
+ // them. The connection-closed path will pick this up; no extra work
here.
}
@Override
public void connectionClosed() {
log.warn("DAG watch session connection closed");
cnx = null;
- initialLayoutFuture.completeExceptionally(
- new PulsarClientException("Connection closed while waiting for
scalable topic layout"));
- // TODO: implement automatic reconnection with backoff
+ if (closed) {
+ return;
+ }
+ if (!initialLayoutFuture.isDone()) {
+ // Initial lookup never completed — surface the failure rather than
+ // retrying silently behind the caller of producer / consumer
create().
+ initialLayoutFuture.completeExceptionally(
+ new PulsarClientException(
+ "Connection closed while waiting for scalable
topic layout"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ private void scheduleReconnect() {
+ if (closed) {
+ return;
+ }
+ long delayMs = reconnectBackoff.next().toMillis();
+ log.info().attr("delayMs", delayMs).log("Scheduling DAG watch
reconnect");
+ v4Client.timer().newTimeout(timeout -> reconnect(),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void reconnect() {
+ if (closed) {
+ return;
+ }
+ v4Client.getConnection(topicName.toString())
+ .thenAccept(this::attach)
+ .exceptionally(ex -> {
+ log.warn().exceptionMessage(ex)
+ .log("DAG watch reconnect failed; will retry");
+ scheduleReconnect();
+ return null;
+ });
}
ClientSegmentLayout currentLayout() {
@@ -182,6 +254,18 @@ final class DagWatchClient implements DagWatchSession,
AutoCloseable {
}
}
+ /**
+ * Test hook: forcibly close the underlying broker channel to simulate a
network
+ * drop. The cnx layer will fire {@link #connectionClosed()} which
triggers the
+ * automatic reconnect path. Reached via reflection from cross-module
tests.
+ */
+ void forceCloseConnectionForTesting() {
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.ctx().channel().close();
+ }
+ }
+
interface LayoutChangeListener {
void onLayoutChange(ClientSegmentLayout newLayout, ClientSegmentLayout
oldLayout);
}