This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f2e4612f6f [fix][client] PIP-475: make async producer survive 
regular-to-scalable migration (#25882)
9f2e4612f6f is described below

commit 9f2e4612f6f3f0e691cde3ec1842b76467a1d5cf
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 23:54:12 2026 -0700

    [fix][client] PIP-475: make async producer survive regular-to-scalable 
migration (#25882)
---
 .../client/api/v5/V5MigrationEndToEndTest.java     | 56 +++++++++++++
 .../client/impl/v5/ScalableTopicProducer.java      | 93 +++++++++++++---------
 2 files changed, 112 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
index 6eaa2cfb406..47cd614053a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
@@ -96,6 +96,62 @@ public class V5MigrationEndToEndTest extends 
V5ClientBaseTest {
         assertEquals(received, sent, "every pre- and post-migration message 
must be consumable");
     }
 
+    @Test
+    public void testV5AsyncProducerSurvivesMigration() throws Exception {
+        // Same as the producer-survives-migration case, but driving the 
*async* producer API
+        // (producer.async()...send()). Async sends issued right after 
migration must ride
+        // through the synthetic→real-DAG transition — the per-segment 
producer for a
+        // just-terminated partition fails, and the send must retry onto an 
active child
+        // rather than fail the user's future.
+        String topic = baseName("e2e-async");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic("persistent://" + topic)
+                .create();
+
+        java.util.List<java.util.concurrent.CompletableFuture<MessageId>> 
sends =
+                new java.util.ArrayList<>();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
+            String v = "pre-" + i;
+            sends.add(producer.async().newMessage().key("k-" + 
i).value(v).send());
+            sent.add(v);
+        }
+
+        admin.scalableTopics().migrateToScalable(topic, false);
+
+        // Issue the post-migration batch via async sends without awaiting in 
between, so the
+        // retry-across-transition path is exercised.
+        for (int i = 0; i < 20; i++) {
+            String v = "post-" + i;
+            sends.add(producer.async().newMessage().key("k-" + 
i).value(v).send());
+            sent.add(v);
+        }
+        // Every async send must eventually complete (none fail across the 
migration boundary).
+        java.util.concurrent.CompletableFuture
+                .allOf(sends.toArray(new 
java.util.concurrent.CompletableFuture[0]))
+                .get(60, java.util.concurrent.TimeUnit.SECONDS);
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic("persistent://" + topic)
+                .subscriptionName("e2e-async-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < 40; i++) {
+            org.apache.pulsar.client.api.v5.Message<String> m = 
consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(m, "expected 40 messages, missing after " + 
received.size());
+            received.add(m.value());
+            consumer.acknowledge(m.id());
+        }
+        assertEquals(received, sent,
+                "every async pre- and post-migration message must be 
consumable");
+    }
+
     @Test
     public void testV4ProducerLockedOutAfterMigration() throws Exception {
         // After migration the old topic is terminated, so a legacy v4 
producer can no longer
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 5c7a2633e47..749767d0c2b 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -293,38 +293,56 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         }
         final long routedSegmentId = segmentId;
 
-        appendToDispatchChain(routedSegmentId, producer -> {
-            var ackFuture = buildV4Message(producer, key, value, properties,
-                    eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
-                    .sendAsync();
-            ackFuture.whenComplete((v4MsgId, ex) -> {
-                if (ex == null) {
-                    userFuture.complete(new MessageIdV5(v4MsgId, 
routedSegmentId));
-                    return;
-                }
-                Throwable cause = ex instanceof 
java.util.concurrent.CompletionException
-                        ? ex.getCause() : ex;
-                boolean segmentSealed = cause
-                        instanceof 
org.apache.pulsar.client.api.PulsarClientException
-                                .TopicTerminatedException
-                        || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
-                                .AlreadyClosedException;
-                if (segmentSealed && attempt < 3) {
-                    log.info().attr("segmentId", routedSegmentId)
-                            .attr("attempt", attempt + 1).log("Segment sealed, 
retrying");
-                    segmentProducers.remove(routedSegmentId);
-                    dispatchChains.remove(routedSegmentId);
-                    CompletableFuture.delayedExecutor(
-                                    100L * (attempt + 1),
-                                    java.util.concurrent.TimeUnit.MILLISECONDS)
-                            .execute(() -> dispatchSendAttempt(userFuture, 
key, value, properties,
-                                    eventTime, sequenceId, deliverAfter, 
deliverAt,
-                                    replicationClusters, txn, attempt + 1));
-                } else {
-                    userFuture.completeExceptionally(ex);
-                }
-            });
-        }, userFuture);
+        // Re-dispatch this message on the next attempt. Used when the target 
segment is gone
+        // — sealed by a split/merge or terminated by a regular-to-scalable 
migration — and
+        // the DAG watch is expected to refresh the layout shortly so 
routeMessage lands on an
+        // active child.
+        Runnable retry = () -> {
+            segmentProducers.remove(routedSegmentId);
+            dispatchChains.remove(routedSegmentId);
+            CompletableFuture.delayedExecutor(
+                            Math.min(100L * (attempt + 1), 
SEND_RETRY_MAX_BACKOFF_MS),
+                            java.util.concurrent.TimeUnit.MILLISECONDS)
+                    .execute(() -> dispatchSendAttempt(userFuture, key, value, 
properties,
+                            eventTime, sequenceId, deliverAfter, deliverAt,
+                            replicationClusters, txn, attempt + 1));
+        };
+
+        appendToDispatchChain(routedSegmentId,
+                producer -> {
+                    var ackFuture = buildV4Message(producer, key, value, 
properties,
+                            eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
+                            .sendAsync();
+                    ackFuture.whenComplete((v4MsgId, ex) -> {
+                        if (ex == null) {
+                            userFuture.complete(new MessageIdV5(v4MsgId, 
routedSegmentId));
+                        } else {
+                            // Failure from the v4 send (e.g. the segment 
sealed mid-flight).
+                            handleAsyncSegmentFailure(userFuture, 
routedSegmentId, attempt, ex, retry);
+                        }
+                    });
+                },
+                // Failure while (re)creating the per-segment producer — e.g. 
the partition was
+                // terminated by a migration between routing and creation.
+                createEx -> handleAsyncSegmentFailure(userFuture, 
routedSegmentId, attempt, createEx, retry));
+    }
+
+    /**
+     * Decide whether an async send failure should be retried. If the target 
segment is gone
+     * (a split/merge seal or a migration termination) and the retry budget 
isn't exhausted,
+     * run {@code retry}; otherwise fail the user-visible future. Covers both 
the v4 send
+     * failure and the per-segment producer-creation failure.
+     */
+    private void handleAsyncSegmentFailure(CompletableFuture<MessageIdV5> 
userFuture, long segmentId,
+                                           int attempt, Throwable ex, Runnable 
retry) {
+        Throwable cause = ex instanceof 
java.util.concurrent.CompletionException ? ex.getCause() : ex;
+        if (isSegmentGoneError(cause) && attempt < SEND_RETRY_MAX_ATTEMPTS) {
+            log.info().attr("segmentId", segmentId).attr("attempt", attempt + 
1)
+                    .log("Target segment gone, retrying async send after 
layout update");
+            retry.run();
+        } else {
+            userFuture.completeExceptionally(ex);
+        }
     }
 
     /**
@@ -332,12 +350,13 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
      * segment-producer-creation future; subsequent links complete as soon as
      * their {@code dispatchOp} returns (which calls v4 {@code sendAsync} — a
      * fast queue insert), so dispatch order strictly mirrors call order.
-     * If the chain itself fails (e.g., segment producer creation failed), the
-     * user-visible future is failed too.
+     * If the chain itself fails (e.g., segment producer creation failed),
+     * {@code onCreateFailure} is invoked so the caller can retry (when the 
segment
+     * is merely gone) or fail the user-visible future.
      */
     private void appendToDispatchChain(long segmentId,
                                        
Consumer<org.apache.pulsar.client.api.Producer<T>> dispatchOp,
-                                       CompletableFuture<MessageIdV5> 
userFuture) {
+                                       Consumer<Throwable> onCreateFailure) {
         synchronized (dispatchLock) {
             var prev = dispatchChains.computeIfAbsent(segmentId,
                     id -> getOrCreateSegmentProducerAsync(id));
@@ -345,9 +364,9 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
                 dispatchOp.accept(producer);
                 return producer;
             });
-            // If the chain link itself faults (creation failure), surface it.
+            // If the chain link itself faults (creation failure), hand it to 
the caller.
             next.exceptionally(ex -> {
-                userFuture.completeExceptionally(ex);
+                onCreateFailure.accept(ex);
                 return null;
             });
             dispatchChains.put(segmentId, next);

Reply via email to