This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f199ec0248 [feat] PIP-468: DagWatchClient auto-reconnects on broker 
disconnect (#25687)
8f199ec0248 is described below

commit 8f199ec024840dd65538cf7282f47ab0dcde1257
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 07:28:14 2026 -0700

    [feat] PIP-468: DagWatchClient auto-reconnects on broker disconnect (#25687)
---
 .../client/api/v5/V5DagWatchAutoReconnectTest.java | 207 +++++++++++++++++++++
 .../pulsar/client/impl/v5/DagWatchClient.java      | 146 ++++++++++++---
 2 files changed, 322 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
new file mode 100644
index 00000000000..c5e294c35ce
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DagWatchAutoReconnectTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code DagWatchClient}'s auto-reconnect path. The DAG watch 
holds
+ * the producer / consumer's view of the scalable topic layout — without
+ * reconnect, a transient broker disconnect silently strands the client on 
stale
+ * layout and never delivers another split / merge update.
+ *
+ * <p>These tests force-close the watch's underlying channel and assert that
+ * producers and consumers continue to operate end-to-end without application
+ * intervention.
+ */
+public class V5DagWatchAutoReconnectTest extends V5ClientBaseTest {
+
+    /**
+     * Producer's DAG watch channel is force-closed mid-life. Sends made after 
the
+     * close must still succeed: the cached layout keeps existing segments
+     * reachable, and the reconnect re-establishes the watch so subsequent 
layout
+     * changes would still be observed.
+     */
+    @Test
+    public void testProducerSurvivesDagWatchConnectionDrop() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dag-reconnect-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int firstN = 10;
+        Set<String> firstSent = new HashSet<>();
+        for (int i = 0; i < firstN; i++) {
+            String v = "first-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            firstSent.add(v);
+        }
+        assertEquals(drain(consumer, firstN), firstSent,
+                "first batch must arrive before the disconnect");
+
+        // Force-close the DAG watch channel. The cnx layer fires 
connectionClosed()
+        // on the DagWatchClient, which schedules a reconnect.
+        forceCloseDagWatchOnProducer(producer);
+
+        // Send a second batch immediately. Existing segments are still 
reachable
+        // through the per-segment v4 producers (their own connections are 
unaffected),
+        // so this proves the producer keeps working through the reconnect 
window.
+        int secondN = 10;
+        Set<String> secondSent = new HashSet<>();
+        for (int i = 0; i < secondN; i++) {
+            String v = "second-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            secondSent.add(v);
+        }
+        assertEquals(drain(consumer, secondN), secondSent,
+                "producer must keep sending after DAG watch reconnect kicks 
in");
+    }
+
+    /**
+     * After a force-close, the DAG watch must observe a fresh broker 
connection
+     * (i.e., its internal {@code cnx} field is re-populated). Asserts the
+     * reconnect path actually fires rather than silently staying disconnected.
+     */
+    @Test
+    public void testDagWatchReattachesAfterDisconnect() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        Object dagWatch = getDagWatchOnProducer(producer);
+        Object originalCnx = readField(dagWatch, "cnx");
+        assertNotNull(originalCnx, "DAG watch must have an initial 
connection");
+
+        forceCloseDagWatchOnProducer(producer);
+
+        // Wait for the reconnect path to land a fresh ClientCnx on the 
DagWatchClient.
+        // Backoff starts at 100ms; allow a generous window for CI.
+        Awaitility.await().atMost(Duration.ofSeconds(15))
+                .until(() -> {
+                    Object current = readField(dagWatch, "cnx");
+                    return current != null && current != originalCnx;
+                });
+    }
+
+    /**
+     * A consumer's DAG watch channel is force-closed mid-life. Like the 
producer
+     * test, this asserts the consumer continues to deliver messages produced
+     * after the disconnect.
+     */
+    @Test
+    public void testConsumerSurvivesDagWatchConnectionDrop() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dag-reconnect-consumer-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int firstN = 10;
+        for (int i = 0; i < firstN; i++) {
+            producer.newMessage().key("k-" + i).value("first-" + i).send();
+        }
+        assertEquals(drain(consumer, firstN).size(), firstN,
+                "first batch must arrive before disconnect");
+
+        forceCloseDagWatchOnConsumer(consumer);
+
+        int secondN = 10;
+        for (int i = 0; i < secondN; i++) {
+            producer.newMessage().key("k-" + i).value("second-" + i).send();
+        }
+        Set<String> got = drain(consumer, secondN);
+        assertEquals(got.size(), secondN,
+                "consumer must keep receiving after DAG watch reconnect kicks 
in");
+    }
+
+    // --- Helpers ---
+
+    private Set<String> drain(QueueConsumer<String> consumer, int expected) 
throws Exception {
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                consumer.acknowledge(msg.id());
+            }
+        }
+        return received;
+    }
+
+    private static Object getDagWatchOnProducer(Producer<?> producer) throws 
Exception {
+        Field f = producer.getClass().getDeclaredField("dagWatch");
+        f.setAccessible(true);
+        Object watch = f.get(producer);
+        assertNotNull(watch, "expected dagWatch on producer");
+        return watch;
+    }
+
+    private static void forceCloseDagWatchOnProducer(Producer<?> producer) 
throws Exception {
+        Object watch = getDagWatchOnProducer(producer);
+        Method m = 
watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+        m.setAccessible(true);
+        m.invoke(watch);
+    }
+
+    private static void forceCloseDagWatchOnConsumer(QueueConsumer<?> 
consumer) throws Exception {
+        Field f = consumer.getClass().getDeclaredField("dagWatch");
+        f.setAccessible(true);
+        Object watch = f.get(consumer);
+        assertNotNull(watch, "expected dagWatch on consumer");
+        Method m = 
watch.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+        m.setAccessible(true);
+        m.invoke(watch);
+    }
+
+    private static Object readField(Object target, String name) throws 
Exception {
+        Field f = target.getClass().getDeclaredField(name);
+        f.setAccessible(true);
+        return f.get(target);
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
index b13828e7561..2041dab9b57 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.impl.v5;
 
 import io.github.merlimat.slog.Logger;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,6 +32,7 @@ import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
 
 /**
  * Client-side manager for a DAG watch session on a scalable topic.
@@ -39,6 +42,12 @@ import org.apache.pulsar.common.protocol.Commands;
  *
  * <p>Maintains the current {@link ClientSegmentLayout} and notifies a listener
  * when it changes.
+ *
+ * <p>Reconnects automatically on transient broker disconnects. The 
initial-create
+ * future ({@link #start}) surfaces failures up front so a producer / consumer
+ * {@code create()} fails fast when the broker is unreachable. After the first
+ * layout has arrived, subsequent disconnects schedule a reconnection with
+ * exponential backoff so long-lived producers / consumers survive network 
blips.
  */
 final class DagWatchClient implements DagWatchSession, AutoCloseable {
 
@@ -52,6 +61,7 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
     private final long sessionId;
     private final AtomicReference<ClientSegmentLayout> currentLayout = new 
AtomicReference<>();
     private final CompletableFuture<ClientSegmentLayout> initialLayoutFuture = 
new CompletableFuture<>();
+    private final Backoff reconnectBackoff;
     private volatile LayoutChangeListener listener;
     private volatile ClientCnx cnx;
     private volatile boolean closed = false;
@@ -60,6 +70,10 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
         this.v4Client = v4Client;
         this.topicName = topicName;
         this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
+        this.reconnectBackoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofSeconds(30))
+                .build();
         this.log = LOG.with().attr("topic", topicName).attr("sessionId", 
sessionId).build();
     }
 
@@ -70,39 +84,55 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
      * @return a future that completes with the initial layout
      */
     CompletableFuture<ClientSegmentLayout> start() {
-        // Get any broker connection and send the lookup command
         v4Client.getConnection(topicName.toString())
-                .thenAccept(cnx -> {
-                    this.cnx = cnx;
-                    if (!cnx.isSupportsScalableTopics()) {
-                        initialLayoutFuture.completeExceptionally(
-                                new 
PulsarClientException.FeatureNotSupportedException(
-                                        "Broker does not support scalable 
topics",
-                                        
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
-                        return;
-                    }
-                    // Register this session to receive updates
-                    cnx.registerDagWatchSession(sessionId, this);
-
-                    // Send the lookup command
-                    cnx.ctx().writeAndFlush(
-                            Commands.newScalableTopicLookup(sessionId, 
topicName.toString()))
-                            .addListener(writeFuture -> {
-                                if (!writeFuture.isSuccess()) {
-                                    cnx.removeDagWatchSession(sessionId);
-                                    initialLayoutFuture.completeExceptionally(
-                                            new 
PulsarClientException(writeFuture.cause()));
-                                }
-                            });
-                })
+                .thenAccept(this::attach)
                 .exceptionally(ex -> {
                     initialLayoutFuture.completeExceptionally(ex);
                     return null;
                 });
-
         return initialLayoutFuture;
     }
 
+    /**
+     * Wire {@code newCnx} to this session and send a ScalableTopicLookup. 
Used by
+     * both {@link #start} (first connect) and {@link #reconnect} (after 
disconnect).
+     */
+    private void attach(ClientCnx newCnx) {
+        if (closed) {
+            return;
+        }
+        if (!newCnx.isSupportsScalableTopics()) {
+            PulsarClientException ex = new 
PulsarClientException.FeatureNotSupportedException(
+                    "Broker does not support scalable topics",
+                    
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics);
+            if (!initialLayoutFuture.isDone()) {
+                initialLayoutFuture.completeExceptionally(ex);
+            } else {
+                log.warn().exceptionMessage(ex)
+                        .log("Reconnect target broker doesn't support scalable 
topics");
+                scheduleReconnect();
+            }
+            return;
+        }
+        this.cnx = newCnx;
+        newCnx.registerDagWatchSession(sessionId, this);
+        newCnx.ctx().writeAndFlush(
+                        Commands.newScalableTopicLookup(sessionId, 
topicName.toString()))
+                .addListener(writeFuture -> {
+                    if (!writeFuture.isSuccess()) {
+                        newCnx.removeDagWatchSession(sessionId);
+                        if (!initialLayoutFuture.isDone()) {
+                            initialLayoutFuture.completeExceptionally(
+                                    new 
PulsarClientException(writeFuture.cause()));
+                        } else {
+                            log.warn().exceptionMessage(writeFuture.cause())
+                                    .log("DAG watch reconnect write failed; 
will retry");
+                            scheduleReconnect();
+                        }
+                    }
+                });
+    }
+
     /**
      * Called when the broker pushes a ScalableTopicUpdate for this session.
      * This is invoked from the Netty I/O thread.
@@ -121,6 +151,10 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
                 .attr("activeSegmentCount", newLayout.activeSegments().size())
                 .log("Layout updated");
 
+        // Reset the reconnect backoff: the broker confirmed the session is 
live and
+        // our local state is consistent.
+        reconnectBackoff.reset();
+
         // Complete the initial layout future if this is the first update
         initialLayoutFuture.complete(newLayout);
 
@@ -138,18 +172,56 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
     public void onError(ServerError error, String message) {
         log.error().attr("error", error).attr("message", message)
                 .log("DAG watch session error");
-        initialLayoutFuture.completeExceptionally(
-                new PulsarClientException(
-                        "Scalable topic lookup failed: " + error + " - " + 
message));
+        if (!initialLayoutFuture.isDone()) {
+            initialLayoutFuture.completeExceptionally(
+                    new PulsarClientException(
+                            "Scalable topic lookup failed: " + error + " - " + 
message));
+        }
+        // After the initial layout has arrived, broker-side errors on this 
session
+        // (e.g., metadata unavailable) are transient — a reconnect typically 
clears
+        // them. The connection-closed path will pick this up; no extra work 
here.
     }
 
     @Override
     public void connectionClosed() {
         log.warn("DAG watch session connection closed");
         cnx = null;
-        initialLayoutFuture.completeExceptionally(
-                new PulsarClientException("Connection closed while waiting for 
scalable topic layout"));
-        // TODO: implement automatic reconnection with backoff
+        if (closed) {
+            return;
+        }
+        if (!initialLayoutFuture.isDone()) {
+            // Initial lookup never completed — surface the failure rather than
+            // retrying silently behind the caller of producer / consumer 
create().
+            initialLayoutFuture.completeExceptionally(
+                    new PulsarClientException(
+                            "Connection closed while waiting for scalable 
topic layout"));
+            return;
+        }
+        scheduleReconnect();
+    }
+
+    private void scheduleReconnect() {
+        if (closed) {
+            return;
+        }
+        long delayMs = reconnectBackoff.next().toMillis();
+        log.info().attr("delayMs", delayMs).log("Scheduling DAG watch 
reconnect");
+        v4Client.timer().newTimeout(timeout -> reconnect(),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void reconnect() {
+        if (closed) {
+            return;
+        }
+        v4Client.getConnection(topicName.toString())
+                .thenAccept(this::attach)
+                .exceptionally(ex -> {
+                    log.warn().exceptionMessage(ex)
+                            .log("DAG watch reconnect failed; will retry");
+                    scheduleReconnect();
+                    return null;
+                });
     }
 
     ClientSegmentLayout currentLayout() {
@@ -182,6 +254,18 @@ final class DagWatchClient implements DagWatchSession, 
AutoCloseable {
         }
     }
 
+    /**
+     * Test hook: forcibly close the underlying broker channel to simulate a 
network
+     * drop. The cnx layer will fire {@link #connectionClosed()} which 
triggers the
+     * automatic reconnect path. Reached via reflection from cross-module 
tests.
+     */
+    void forceCloseConnectionForTesting() {
+        ClientCnx c = cnx;
+        if (c != null) {
+            c.ctx().channel().close();
+        }
+    }
+
     interface LayoutChangeListener {
         void onLayoutChange(ClientSegmentLayout newLayout, ClientSegmentLayout 
oldLayout);
     }

Reply via email to