This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f2e4612f6f [fix][client] PIP-475: make async producer survive
regular-to-scalable migration (#25882)
9f2e4612f6f is described below
commit 9f2e4612f6f3f0e691cde3ec1842b76467a1d5cf
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 23:54:12 2026 -0700
[fix][client] PIP-475: make async producer survive regular-to-scalable
migration (#25882)
---
.../client/api/v5/V5MigrationEndToEndTest.java | 56 +++++++++++++
.../client/impl/v5/ScalableTopicProducer.java | 93 +++++++++++++---------
2 files changed, 112 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
index 6eaa2cfb406..47cd614053a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
@@ -96,6 +96,62 @@ public class V5MigrationEndToEndTest extends
V5ClientBaseTest {
assertEquals(received, sent, "every pre- and post-migration message
must be consumable");
}
+ @Test
+ public void testV5AsyncProducerSurvivesMigration() throws Exception {
+ // Same as the producer-survives-migration case, but driving the
*async* producer API
+ // (producer.async()...send()). Async sends issued right after
migration must ride
+ // through the synthetic→real-DAG transition — the per-segment
producer for a
+ // just-terminated partition fails, and the send must retry onto an
active child
+ // rather than fail the user's future.
+ String topic = baseName("e2e-async");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic("persistent://" + topic)
+ .create();
+
+ java.util.List<java.util.concurrent.CompletableFuture<MessageId>>
sends =
+ new java.util.ArrayList<>();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ String v = "pre-" + i;
+ sends.add(producer.async().newMessage().key("k-" +
i).value(v).send());
+ sent.add(v);
+ }
+
+ admin.scalableTopics().migrateToScalable(topic, false);
+
+ // Issue the post-migration batch via async sends without awaiting in
between, so the
+ // retry-across-transition path is exercised.
+ for (int i = 0; i < 20; i++) {
+ String v = "post-" + i;
+ sends.add(producer.async().newMessage().key("k-" +
i).value(v).send());
+ sent.add(v);
+ }
+ // Every async send must eventually complete (none fail across the
migration boundary).
+ java.util.concurrent.CompletableFuture
+ .allOf(sends.toArray(new
java.util.concurrent.CompletableFuture[0]))
+ .get(60, java.util.concurrent.TimeUnit.SECONDS);
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic("persistent://" + topic)
+ .subscriptionName("e2e-async-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < 40; i++) {
+ org.apache.pulsar.client.api.v5.Message<String> m =
consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(m, "expected 40 messages, missing after " +
received.size());
+ received.add(m.value());
+ consumer.acknowledge(m.id());
+ }
+ assertEquals(received, sent,
+ "every async pre- and post-migration message must be
consumable");
+ }
+
@Test
public void testV4ProducerLockedOutAfterMigration() throws Exception {
// After migration the old topic is terminated, so a legacy v4
producer can no longer
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 5c7a2633e47..749767d0c2b 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -293,38 +293,56 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
final long routedSegmentId = segmentId;
- appendToDispatchChain(routedSegmentId, producer -> {
- var ackFuture = buildV4Message(producer, key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
- .sendAsync();
- ackFuture.whenComplete((v4MsgId, ex) -> {
- if (ex == null) {
- userFuture.complete(new MessageIdV5(v4MsgId,
routedSegmentId));
- return;
- }
- Throwable cause = ex instanceof
java.util.concurrent.CompletionException
- ? ex.getCause() : ex;
- boolean segmentSealed = cause
- instanceof
org.apache.pulsar.client.api.PulsarClientException
- .TopicTerminatedException
- || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
- .AlreadyClosedException;
- if (segmentSealed && attempt < 3) {
- log.info().attr("segmentId", routedSegmentId)
- .attr("attempt", attempt + 1).log("Segment sealed,
retrying");
- segmentProducers.remove(routedSegmentId);
- dispatchChains.remove(routedSegmentId);
- CompletableFuture.delayedExecutor(
- 100L * (attempt + 1),
- java.util.concurrent.TimeUnit.MILLISECONDS)
- .execute(() -> dispatchSendAttempt(userFuture,
key, value, properties,
- eventTime, sequenceId, deliverAfter,
deliverAt,
- replicationClusters, txn, attempt + 1));
- } else {
- userFuture.completeExceptionally(ex);
- }
- });
- }, userFuture);
+ // Re-dispatch this message on the next attempt. Used when the target
segment is gone
+ // — sealed by a split/merge or terminated by a regular-to-scalable
migration — and
+ // the DAG watch is expected to refresh the layout shortly so
routeMessage lands on an
+ // active child.
+ Runnable retry = () -> {
+ segmentProducers.remove(routedSegmentId);
+ dispatchChains.remove(routedSegmentId);
+ CompletableFuture.delayedExecutor(
+ Math.min(100L * (attempt + 1),
SEND_RETRY_MAX_BACKOFF_MS),
+ java.util.concurrent.TimeUnit.MILLISECONDS)
+ .execute(() -> dispatchSendAttempt(userFuture, key, value,
properties,
+ eventTime, sequenceId, deliverAfter, deliverAt,
+ replicationClusters, txn, attempt + 1));
+ };
+
+ appendToDispatchChain(routedSegmentId,
+ producer -> {
+ var ackFuture = buildV4Message(producer, key, value,
properties,
+ eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
+ .sendAsync();
+ ackFuture.whenComplete((v4MsgId, ex) -> {
+ if (ex == null) {
+ userFuture.complete(new MessageIdV5(v4MsgId,
routedSegmentId));
+ } else {
+ // Failure from the v4 send (e.g. the segment
sealed mid-flight).
+ handleAsyncSegmentFailure(userFuture,
routedSegmentId, attempt, ex, retry);
+ }
+ });
+ },
+ // Failure while (re)creating the per-segment producer — e.g.
the partition was
+ // terminated by a migration between routing and creation.
+ createEx -> handleAsyncSegmentFailure(userFuture,
routedSegmentId, attempt, createEx, retry));
+ }
+
+ /**
+ * Decide whether an async send failure should be retried. If the target
segment is gone
+ * (a split/merge seal or a migration termination) and the retry budget
isn't exhausted,
+ * run {@code retry}; otherwise fail the user-visible future. Covers both
the v4 send
+ * failure and the per-segment producer-creation failure.
+ */
+ private void handleAsyncSegmentFailure(CompletableFuture<MessageIdV5>
userFuture, long segmentId,
+ int attempt, Throwable ex, Runnable
retry) {
+ Throwable cause = ex instanceof
java.util.concurrent.CompletionException ? ex.getCause() : ex;
+ if (isSegmentGoneError(cause) && attempt < SEND_RETRY_MAX_ATTEMPTS) {
+ log.info().attr("segmentId", segmentId).attr("attempt", attempt +
1)
+ .log("Target segment gone, retrying async send after
layout update");
+ retry.run();
+ } else {
+ userFuture.completeExceptionally(ex);
+ }
}
/**
@@ -332,12 +350,13 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
* segment-producer-creation future; subsequent links complete as soon as
* their {@code dispatchOp} returns (which calls v4 {@code sendAsync} — a
* fast queue insert), so dispatch order strictly mirrors call order.
- * If the chain itself fails (e.g., segment producer creation failed), the
- * user-visible future is failed too.
+ * If the chain itself fails (e.g., segment producer creation failed),
+ * {@code onCreateFailure} is invoked so the caller can retry (when the
segment
+ * is merely gone) or fail the user-visible future.
*/
private void appendToDispatchChain(long segmentId,
Consumer<org.apache.pulsar.client.api.Producer<T>> dispatchOp,
- CompletableFuture<MessageIdV5>
userFuture) {
+ Consumer<Throwable> onCreateFailure) {
synchronized (dispatchLock) {
var prev = dispatchChains.computeIfAbsent(segmentId,
id -> getOrCreateSegmentProducerAsync(id));
@@ -345,9 +364,9 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
dispatchOp.accept(producer);
return producer;
});
- // If the chain link itself faults (creation failure), surface it.
+ // If the chain link itself faults (creation failure), hand it to
the caller.
next.exceptionally(ex -> {
- userFuture.completeExceptionally(ex);
+ onCreateFailure.accept(ex);
return null;
});
dispatchChains.put(segmentId, next);