This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 951a426dd2a [test][broker] PIP-475: end-to-end migration tests +
transition fixes (#25878)
951a426dd2a is described below
commit 951a426dd2a1ecc645c069afb666b82176dd88ed
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:05:56 2026 -0700
[test][broker] PIP-475: end-to-end migration tests + transition fixes
(#25878)
---
.../pulsar/broker/admin/v2/ScalableTopics.java | 22 ++--
.../broker/service/scalable/DagWatchSession.java | 16 ++-
.../client/api/v5/V5MigrationEndToEndTest.java | 117 +++++++++++++++++++++
.../client/impl/v5/ScalableTopicProducer.java | 86 +++++++++++----
.../topics/TestScalableTopicMigration.java | 105 ++++++++++++++++++
.../src/test/resources/pulsar-messaging.xml | 1 +
6 files changed, 314 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index f76d9a11ff2..8c310c803ef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -339,12 +339,22 @@ public class ScalableTopics extends AdminResource {
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
- CompletableFuture<? extends TopicStats> statsFuture =
- partitions > 0
- ?
admin.topics().getPartitionedStatsAsync(persistentBase.toString(), false)
- :
admin.topics().getStatsAsync(persistentBase.toString());
- return statsFuture.thenAccept(stats -> {
- long legacy = countLegacyConnections(stats);
+ // For a partitioned topic, inspect per-partition stats rather than
the aggregate:
+ // aggregation merges publishers by producer name into fresh stat
objects that drop
+ // per-connection metadata, which would hide the V5-managed marker and
make every
+ // V5 connection look like a legacy v4 one.
+ final CompletableFuture<Long> legacyCount = partitions > 0
+ ?
admin.topics().getPartitionedStatsAsync(persistentBase.toString(), true)
+ .thenApply(stats -> {
+ long count = 0;
+ for (TopicStats partitionStats :
stats.getPartitions().values()) {
+ count +=
countLegacyConnections(partitionStats);
+ }
+ return count;
+ })
+ : admin.topics().getStatsAsync(persistentBase.toString())
+ .thenApply(ScalableTopics::countLegacyConnections);
+ return legacyCount.thenAccept(legacy -> {
if (legacy > 0) {
throw new RestException(Response.Status.CONFLICT,
legacy + " legacy v4 client connection(s) still
attached to " + persistentBase
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index 350125438e7..fe033b23336 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -63,8 +63,11 @@ public class DagWatchSession implements
ScalableTopicResources.MetadataPathListe
private final BrokerService brokerService;
private final String metadataPath;
- /** Canonical {@code topic://...} identity returned to the client
regardless of the
- * input form ({@code topic://}, {@code persistent://}, or short-form). */
+ /** Canonical {@code topic://...} identity, regardless of the input form
({@code topic://},
+ * {@code persistent://}, or short-form). Used both as the {@code
resolved_topic_name}
+ * reported to the client and as the parent when computing {@code
segment://} URIs for a
+ * real DAG (those require the {@code topic://} domain). */
+ private final TopicName scalableTopicName;
private final String resolvedTopicName;
private volatile boolean closed = false;
@@ -79,7 +82,8 @@ public class DagWatchSession implements
ScalableTopicResources.MetadataPathListe
this.resources = resources;
this.brokerService = brokerService;
this.metadataPath = resources.topicPath(topicName);
- this.resolvedTopicName = topicName.toScalableTopic().toString();
+ this.scalableTopicName = topicName.toScalableTopic();
+ this.resolvedTopicName = scalableTopicName.toString();
this.log = LOG.with().attr("topic", topicName).attr("sessionId",
sessionId).build();
}
@@ -341,9 +345,11 @@ public class DagWatchSession implements
ScalableTopicResources.MetadataPathListe
Map<Long, String> result = new LinkedHashMap<>();
CompletableFuture<?>[] futures =
layout.getActiveSegments().values().stream()
.map(segment -> {
- // Resolve which broker owns this segment's underlying
segment:// topic
+ // Resolve which broker owns this segment's underlying
segment:// topic.
+ // SegmentTopicName.fromParent requires the topic://
domain, so use the
+ // canonical scalable name (the session's input may be
persistent://).
TopicName segTn =
org.apache.pulsar.common.scalable.SegmentTopicName.fromParent(
- topicName, segment.hashRange(),
segment.segmentId());
+ scalableTopicName, segment.hashRange(),
segment.segmentId());
var lookupOptions =
org.apache.pulsar.broker.namespace.LookupOptions.builder()
.readOnly(false).authoritative(false).build();
return brokerService.getPulsar().getNamespaceService()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
new file mode 100644
index 00000000000..6eaa2cfb406
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MigrationEndToEndTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.expectThrows;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for the PIP-475 regular-to-scalable migration, exercising
the full
+ * operator timeline against a live broker: V5 clients operate on a regular
topic via the
+ * synthetic layout, the operator migrates, and the same clients transparently
transition
+ * to the real DAG with no data loss across the migration boundary.
+ */
+public class V5MigrationEndToEndTest extends V5ClientBaseTest {
+
+ private String baseName(String suffix) {
+ return getNamespace() + "/" + suffix + "-" +
UUID.randomUUID().toString().substring(0, 8);
+ }
+
+ @Test
+ public void testV5ProducerSurvivesMigrationAndAllDataIsConsumable() throws
Exception {
+ // Timeline:
+ // 1. A 2-partition regular topic exists.
+ // 2. A V5 producer publishes batch #1 through the synthetic layout
(mod-N routing
+ // to the legacy segments == the partitions).
+ // 3. The operator migrates the topic (only the V5 producer is
attached, and it
+ // carries the V5-managed marker, so the precheck passes without
--force).
+ // 4. The same V5 producer publishes batch #2 — it has transparently
transitioned to
+ // the real DAG and now range-routes to the new active child
segments.
+ // 5. A V5 queue consumer reading EARLIEST drains everything: batch
#1 from the sealed
+ // legacy parents, batch #2 from the active children.
+ String topic = baseName("e2e");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic("persistent://" + topic)
+ .create();
+
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Migrate. The attached V5 producer is marked, so no legacy
connections are seen.
+ admin.scalableTopics().migrateToScalable(topic, false);
+
+ // The producer transparently follows the layout change to the real
DAG.
+ for (int i = 0; i < 20; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic("persistent://" + topic)
+ .subscriptionName("e2e-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < 40; i++) {
+ org.apache.pulsar.client.api.v5.Message<String> m =
consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(m, "expected 40 messages, missing after " +
received.size());
+ received.add(m.value());
+ consumer.acknowledge(m.id());
+ }
+ assertEquals(received, sent, "every pre- and post-migration message
must be consumable");
+ }
+
+ @Test
+ public void testV4ProducerLockedOutAfterMigration() throws Exception {
+ // After migration the old topic is terminated, so a legacy v4
producer can no longer
+ // write to it — the produce fails with a terminated-topic error.
+ String topic = baseName("lockout");
+ admin.topics().createNonPartitionedTopic("persistent://" + topic);
+
+ admin.scalableTopics().migrateToScalable(topic, false);
+
+ // A v4 producer either fails to create on, or fails to send to, the
now-terminated
+ // (and scalable-shadowed) persistent:// topic.
+ expectThrows(org.apache.pulsar.client.api.PulsarClientException.class,
() -> {
+ org.apache.pulsar.client.api.Producer<byte[]> v4Producer =
pulsarClient.newProducer()
+ .topic("persistent://" + topic)
+ .create();
+ v4Producer.send("blocked".getBytes());
+ });
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index dd1540feee7..5c7a2633e47 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -47,6 +47,13 @@ import
org.apache.pulsar.common.scalable.ScalableTopicConstants;
final class ScalableTopicProducer<T> implements Producer<T>,
DagWatchClient.LayoutChangeListener {
private static final Logger LOG = Logger.get(ScalableTopicProducer.class);
+
+ /** Max attempts for a send when the target segment is gone (split/merge
seal or migration
+ * termination), giving the DAG watch time to deliver the new layout
before giving up. */
+ private static final int SEND_RETRY_MAX_ATTEMPTS = 10;
+ /** Cap on the per-attempt backoff while waiting for the new layout. */
+ private static final long SEND_RETRY_MAX_BACKOFF_MS = 500L;
+
private final Logger log;
private final PulsarClientV5 client;
@@ -181,38 +188,73 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
java.util.List<String> replicationClusters,
org.apache.pulsar.client.api.v5.Transaction txn) throws
PulsarClientException {
- for (int attempt = 0; attempt < 3; attempt++) {
+ PulsarClientException lastError = null;
+ for (int attempt = 0; attempt < SEND_RETRY_MAX_ATTEMPTS; attempt++) {
long segmentId = routeMessage(key);
- var producer = getOrCreateSegmentProducer(segmentId);
-
try {
+ var producer = getOrCreateSegmentProducer(segmentId);
var v4MsgId = buildV4Message(producer, key, value, properties,
eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
.send();
return new MessageIdV5(v4MsgId, segmentId);
- } catch
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
- |
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException e) {
- // The segment was sealed (split/merge). We may observe this
either as
- // TopicTerminated (broker reply to a still-open producer) or
AlreadyClosed
- // (the v4 producer noticed first and shut itself down).
Either way, drop
- // the stale per-segment producer and retry — the DAG watch
will deliver
- // the new layout shortly, and routeMessage on the next
attempt will land
- // on an active child.
- log.info().attr("segmentId", segmentId)
- .attr("attempt", attempt + 1)
- .log("Segment sealed, waiting for layout update");
- segmentProducers.remove(segmentId);
- try {
- Thread.sleep(100L * (attempt + 1));
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException("Interrupted while waiting
for layout update", ie);
+ } catch (PulsarClientException e) {
+ // Thrown while (re)creating the per-segment producer —
already a V5 exception
+ // (it may wrap a v4 TopicTerminated/AlreadyClosed cause).
+ if (!isSegmentGoneError(e)) {
+ throw e;
}
+ lastError = e;
} catch (org.apache.pulsar.client.api.PulsarClientException e) {
- throw new PulsarClientException(e.getMessage(), e);
+ // Thrown by the v4 producer's send().
+ if (!isSegmentGoneError(e)) {
+ throw new PulsarClientException(e.getMessage(), e);
+ }
+ lastError = new PulsarClientException(e.getMessage(), e);
+ }
+ // The target segment is gone: sealed by a split/merge, or
terminated by a
+ // regular-to-scalable migration. Drop the stale per-segment
producer and wait
+ // for the DAG watch to deliver the new layout; routeMessage on
the next attempt
+ // lands on an active child.
+ log.info().attr("segmentId", segmentId).attr("attempt", attempt +
1)
+ .log("Target segment gone, waiting for layout update");
+ segmentProducers.remove(segmentId);
+ try {
+ Thread.sleep(Math.min(100L * (attempt + 1),
SEND_RETRY_MAX_BACKOFF_MS));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Interrupted while waiting for
layout update", ie);
+ }
+ }
+ throw lastError != null ? lastError
+ : new PulsarClientException("Failed to send after segment
termination retries");
+ }
+
+ /**
+ * True if {@code t} (or one of its causes) signals that the target
segment is gone —
+ * sealed by a split/merge or terminated by a regular-to-scalable
migration — so the send
+ * should be retried once the new layout arrives. Handles both the v4
exceptions thrown by
+ * {@code send()} and the V5-wrapped exceptions thrown while (re)creating
the per-segment
+ * producer on a now-terminated topic.
+ */
+ private static boolean isSegmentGoneError(Throwable t) {
+ for (Throwable cause = t; cause != null; cause = cause.getCause()) {
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException) {
+ return true;
+ }
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException) {
+ return true;
+ }
+ // The per-segment producer-creation path can surface the broker's
terminated /
+ // already-closed error as a plain (untyped) PulsarClientException
whose message
+ // carries the server-side class name; match on that too.
+ String msg = cause.getMessage();
+ if (msg != null
+ && (msg.contains("TopicTerminated") ||
msg.contains("already terminated")
+ || msg.contains("AlreadyClosed"))) {
+ return true;
}
}
- throw new PulsarClientException("Failed to send after segment
termination retries");
+ return false;
}
/**
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
new file mode 100644
index 00000000000..b3e7d5d5249
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestScalableTopicMigration.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topics;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.function.Supplier;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for the PIP-475 regular-to-scalable topic migration,
exercising the
+ * {@code pulsar-admin scalable-topics migrate} command against a real
multi-broker cluster
+ * (real metadata store, BookKeeper, and cross-broker bundle ownership).
+ *
+ * <p>The V5-client transparent transition across the migration boundary is
covered by the
+ * in-process broker test {@code V5MigrationEndToEndTest}; this test focuses
on what the
+ * dockerized cluster adds: that the migration command, its CLI wiring, the
resulting scalable
+ * metadata, and the post-migration termination of the old topic all behave in
a real
+ * deployment.
+ */
+@CustomLog
+public class TestScalableTopicMigration extends PulsarTestSuite {
+
+ private final int numBrokers = 2;
+
+ public void setupCluster() throws Exception {
+ this.setupCluster("");
+ }
+
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ specBuilder.numBrokers(numBrokers);
+ specBuilder.enableContainerLog(true);
+ return specBuilder;
+ }
+
+ @Test(dataProvider = "ServiceUrls", timeOut = 300_000)
+ public void testMigrateRegularTopicToScalable(Supplier<String> serviceUrl)
throws Exception {
+ final String nsName = "mig-" + randomName(6);
+ final String namespace = "public/" + nsName;
+ final String shortTopic = namespace + "/regular";
+ final String topic = "persistent://" + shortTopic;
+ final int numPartitions = numBrokers * 2;
+
+ pulsarCluster.createNamespace(nsName);
+ pulsarCluster.createPartitionedTopic(topic, numPartitions);
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
+ // Seed data on the regular partitioned topic (lands across the
partitions).
+ try (Producer<byte[]> producer =
client.newProducer().topic(topic).create()) {
+ for (int i = 0; i < 50; i++) {
+ producer.newMessage().key("k-" + i).value(("v-" +
i).getBytes(UTF_8)).send();
+ }
+ }
+
+ // Migrate via the admin CLI.
+ ContainerExecResult migrate =
pulsarCluster.runAdminCommandOnAnyBroker(
+ "scalable-topics", "migrate", topic);
+ assertEquals(migrate.getExitCode(), 0L, "migrate failed: " +
migrate.getStderr());
+
+ // The topic is now scalable: get-metadata returns its segment DAG.
+ ContainerExecResult metadata =
pulsarCluster.runAdminCommandOnAnyBroker(
+ "scalable-topics", "get-metadata", shortTopic);
+ assertEquals(metadata.getExitCode(), 0L, "get-metadata failed: " +
metadata.getStderr());
+ assertTrue(metadata.getStdout().contains("segmentId"),
+ "scalable metadata should list segments, got: " +
metadata.getStdout());
+
+ // v4 lockout: the old partitions are terminated, so a legacy v4
producer can no
+ // longer write to the topic.
+ try (Producer<byte[]> blocked =
client.newProducer().topic(topic).create()) {
+ blocked.send("blocked".getBytes(UTF_8));
+ fail("v4 produce to a migrated (terminated) topic must fail");
+ } catch (PulsarClientException expected) {
+ log.info().exceptionMessage(expected)
+ .log("v4 producer correctly locked out after
migration");
+ }
+ }
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml
b/tests/integration/src/test/resources/pulsar-messaging.xml
index a34670267dc..e3fd40e9978 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -30,6 +30,7 @@
<class
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
/>
<class
name="org.apache.pulsar.tests.integration.messaging.MessagingSmokeTest" />
<class name="org.apache.pulsar.tests.integration.admin.AdminTest"
/>
+ <class
name="org.apache.pulsar.tests.integration.topics.TestScalableTopicMigration" />
<class
name="org.apache.pulsar.tests.integration.oxia.OxiaSmokeTest" />
</classes>