This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f0429c1cd8 [fix][broker] Add topic consistency check (#24118)
7f0429c1cd8 is described below
commit 7f0429c1cd81fbde6e0213a84e42f1ea55b5f7d2
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Mar 26 18:01:20 2025 +0800
[fix][broker] Add topic consistency check (#24118)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../pulsar/broker/namespace/NamespaceService.java | 35 +++--
.../pulsar/broker/service/BrokerService.java | 150 +++++++++++----------
.../apache/pulsar/broker/admin/AdminApiTest.java | 6 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 20 +--
.../broker/protocol/PulsarClientBasedHandler.java | 7 +-
.../broker/service/ExclusiveProducerTest.java | 16 ++-
.../pulsar/broker/service/PersistentTopicTest.java | 8 +-
.../pulsar/broker/service/ReplicatorTest.java | 6 +-
.../nonpersistent/NonPersistentTopicTest.java | 20 ++-
.../service/persistent/PersistentTopicTest.java | 35 +----
.../pulsar/client/api/ConsumerCreationTest.java | 127 +++++++++++++++++
.../pulsar/client/api/ProducerCreationTest.java | 73 ++++++++++
.../pulsar/client/impl/LookupServiceTest.java | 82 +++++++++++
.../pulsar/client/api/PulsarClientException.java | 4 +-
14 files changed, 433 insertions(+), 156 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b2ee299bb03..8f5cef1bdff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1412,17 +1412,26 @@ public class NamespaceService implements AutoCloseable {
* Check topic exists( partitioned or non-partitioned ).
*/
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName
topic) {
- return pulsar.getBrokerService()
-
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
- .thenCompose(metadata -> {
- if (metadata.partitions > 0) {
- return CompletableFuture.completedFuture(
-
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
- }
- return checkNonPartitionedTopicExists(topic)
- .thenApply(b -> b ?
TopicExistsInfo.newNonPartitionedTopicExists()
- : TopicExistsInfo.newTopicNotExists());
- });
+ // For non-persistent/persistent partitioned topic, which has metadata.
+ return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ topic.isPartitioned() ?
TopicName.get(topic.getPartitionedTopicName()) : topic)
+ .thenCompose(metadata -> {
+ if (metadata.partitions > 0) {
+ if (!topic.isPartitioned()) {
+ return CompletableFuture.completedFuture(
+
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
+ } else {
+ if (topic.getPartitionIndex() <
metadata.partitions) {
+ return CompletableFuture.completedFuture(
+
TopicExistsInfo.newNonPartitionedTopicExists());
+ }
+ }
+ }
+ // Direct query the single topic.
+ return checkNonPartitionedTopicExists(topic).thenApply(
+ b -> b ?
TopicExistsInfo.newNonPartitionedTopicExists() :
+ TopicExistsInfo.newTopicNotExists());
+ });
}
/***
@@ -1443,12 +1452,12 @@ public class NamespaceService implements AutoCloseable {
*/
public CompletableFuture<Boolean>
checkNonPersistentNonPartitionedTopicExists(String topic) {
TopicName topicName = TopicName.get(topic);
- // "non-partitioned & non-persistent" topics only exist on the owner
broker.
+ // "non-partitioned & non-persistent" topics only exist on the cache
of the owner broker.
return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned
-> {
// The current broker is the owner.
if (isOwned) {
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
pulsar.getBrokerService()
- .getTopic(topic, false);
+ .getTopics().get(topic);
if (nonPersistentTopicFuture != null) {
return
nonPersistentTopicFuture.thenApply(Optional::isPresent);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 107bb01ffa0..dd3486496e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1061,6 +1061,70 @@ public class BrokerService implements Closeable {
return getTopic(TopicName.get(topic), createIfMissing, properties);
}
+ /**
+ * Validates that the topic is consistent with its partition metadata.
+ *
+ * This method ensures the topic (partitioned or non-partitioned) correctly
+ * matches the actual partitions in the metadata. Inconsistencies typically
+ * indicate configuration issues or metadata synchronization problems.
+ *
+ * This validation is particularly important in geo-replicated
environments where
+ * topic metadata may not be fully synchronized across all regions,
potentially
+ * leading to access errors if not properly handled.
+ *
+ * @param topicName The topic name to validate
+ * @return CompletableFuture that completes normally if validation passes,
or
+ * completes exceptionally with NotAllowedException if validation fails
+ */
+ private CompletableFuture<Void> validateTopicConsistency(TopicName
topicName) {
+ if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
+ // Skip validation for heartbeat namespace.
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName baseTopicName =
+ topicName.isPartitioned() ?
TopicName.get(topicName.getPartitionedTopicName()) : topicName;
+ return fetchPartitionedTopicMetadataAsync(baseTopicName)
+ .thenCompose(metadata -> {
+ if (topicName.isPartitioned()) {
+ if (metadata.partitions == 0) {
+ // Edge case: When a complete partitioned topic
name is provided but metadata shows 0
+ // partitions.
+ // This indicates that the partitioned topic
metadata doesn't exist.
+ //
+ // Resolution options:
+ // 1. Creates the partitioned topic via admin API.
+ // 2. Uses the base topic name and then rely on
auto-creation the partitioned topic if
+ // enabled.
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.NotAllowedException(
+ "Partition metadata not found for
the partitioned topic: " + topicName));
+ }
+ if (topicName.getPartitionIndex() >=
metadata.partitions) {
+ final String errorMsg =
+ String.format(
+ "Illegal topic partition name %s
with max allowed "
+ + "%d partitions",
topicName,
+ metadata.partitions);
+ log.warn(errorMsg);
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.NotAllowedException(errorMsg));
+ }
+ } else if (metadata.partitions > 0) {
+ // Edge case: Non-partitioned topic name was provided,
but metadata indicates this is
+ // actually a partitioned
+ // topic (partitions > 0).
+ //
+ // Resolution: Must use the complete partitioned topic
name('topic-name-partition-N').
+ //
+ // This ensures proper routing to the specific
partition and prevents ambiguity in topic
+ // addressing.
+ return FutureUtil.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "Found partitioned metadata for
non-partitioned topic: " + topicName));
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
/**
* Retrieves or creates a topic based on the specified parameters.
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a
failed future with NotAllowedException.
@@ -1107,30 +1171,9 @@ public class BrokerService implements Closeable {
throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
- if (topicName.isPartitioned()) {
- final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
- return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
- .thenCompose((metadata) -> {
- // Allow creating non-partitioned
persistent topic that name includes
- // `partition`
- if (metadata.partitions == 0
- ||
topicName.getPartitionIndex() < metadata.partitions) {
- return
topics.computeIfAbsent(topicName.toString(), (tpName) ->
-
loadOrCreatePersistentTopic(tpName,
- createIfMissing,
properties, topicPolicies));
- } else {
- final String errorMsg =
- String.format("Illegal
topic partition name %s with max allowed "
- + "%d partitions",
topicName, metadata.partitions);
- log.warn(errorMsg);
- return FutureUtil.failedFuture(
- new
BrokerServiceException.NotAllowedException(errorMsg));
- }
- });
- } else {
- return
topics.computeIfAbsent(topicName.toString(), (tpName) ->
- loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies));
- }
+ return topics.computeIfAbsent(topicName.toString(),
+ (tpName) ->
loadOrCreatePersistentTopic(tpName, createIfMissing, properties,
+ topicPolicies));
});
});
} else {
@@ -1144,29 +1187,10 @@ public class BrokerService implements Closeable {
if (!topics.containsKey(topicName.toString())) {
topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.BEFORE);
}
- if (topicName.isPartitioned()) {
- final TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
- return
this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata)
-> {
- if (topicName.getPartitionIndex() <
metadata.partitions) {
- return
topics.computeIfAbsent(topicName.toString(), (name) -> {
- topicEventsDispatcher
- .notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
-
- CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
-
- CompletableFuture<Optional<Topic>> eventFuture
= topicEventsDispatcher
- .notifyOnCompletion(res,
topicName.toString(), TopicEvent.CREATE);
- topicEventsDispatcher
- .notifyOnCompletion(eventFuture,
topicName.toString(), TopicEvent.LOAD);
- return res;
- });
- }
- topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
- return
CompletableFuture.completedFuture(Optional.empty());
- });
- } else if (createIfMissing) {
+ if (topicName.isPartitioned() || createIfMissing) {
return topics.computeIfAbsent(topicName.toString(), (name)
-> {
- topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
+ topicEventsDispatcher
+ .notify(topicName.toString(),
TopicEvent.CREATE, EventStage.BEFORE);
CompletableFuture<Optional<Topic>> res =
createNonPersistentTopic(name);
@@ -1176,14 +1200,13 @@ public class BrokerService implements Closeable {
.notifyOnCompletion(eventFuture,
topicName.toString(), TopicEvent.LOAD);
return res;
});
- } else {
- CompletableFuture<Optional<Topic>> topicFuture =
topics.get(topicName.toString());
- if (topicFuture == null) {
- topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
- topicFuture =
CompletableFuture.completedFuture(Optional.empty());
- }
- return topicFuture;
}
+ CompletableFuture<Optional<Topic>> topicFuture =
topics.get(topicName.toString());
+ if (topicFuture == null) {
+ topicEventsDispatcher.notify(topicName.toString(),
TopicEvent.LOAD, EventStage.FAILURE);
+ topicFuture =
CompletableFuture.completedFuture(Optional.empty());
+ }
+ return topicFuture;
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic",
topicName, e);
@@ -1357,8 +1380,9 @@ public class BrokerService implements Closeable {
topicFuture.completeExceptionally(e);
return topicFuture;
}
- CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
- isOwner.thenRun(() -> {
+ checkTopicNsOwnership(topic)
+ .thenCompose((__) ->
validateTopicConsistency(TopicName.get(topic)))
+ .thenRun(() -> {
nonPersistentTopic.initialize()
.thenCompose(__ -> nonPersistentTopic.checkReplication())
.thenRun(() -> {
@@ -1375,17 +1399,7 @@ public class BrokerService implements Closeable {
return null;
});
}).exceptionally(e -> {
- log.warn("CheckTopicNsOwnership fail when
createNonPersistentTopic! {}", topic, e.getCause());
- // CheckTopicNsOwnership fail dont create nonPersistentTopic, when
topic do lookup will find the correct
- // broker. When client get non-persistent-partitioned topic
- // metadata will the non-persistent-topic will be created.
- // so we should add checkTopicNsOwnership logic otherwise the
topic will be created
- // if it dont own by this broker,we should return success
- // otherwise it will keep retrying getPartitionedTopicMetadata
- topicFuture.complete(Optional.of(nonPersistentTopic));
- // after get metadata return success, we should delete this topic
from this broker, because this topic not
- // owner by this broker and it don't initialize and
checkReplication
- pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
+
topicFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
@@ -1772,8 +1786,8 @@ public class BrokerService implements Closeable {
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> isTopicAlreadyMigrated =
checkTopicAlreadyMigrated(topicName);
-
- maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated)
+ maxTopicsCheck.thenCompose(partitionedTopicMetadata ->
validateTopicConsistency(topicName))
+ .thenCompose(__ -> isTopicAlreadyMigrated)
.thenCompose(__ -> getManagedLedgerConfig(topicName,
topicPolicies))
.thenAccept(managedLedgerConfig -> {
if (isBrokerEntryMetadataEnabled() ||
isBrokerPayloadProcessorEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cea43cc9345..e9ca122bba1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2912,10 +2912,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void testPersistentTopicsExpireMessagesInvalidPartitionIndex()
throws Exception {
- // Force to create a topic
-
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2",
0);
- assertEquals(admin.topics().getList("prop-xyz/ns1"),
- List.of("persistent://prop-xyz/ns1/ds2-partition-2"));
+ // Create a topic
+ admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds2",
3);
// create consumer and subscription
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c90ad15242c..e06abd972c1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.net.InetSocketAddress;
import java.util.List;
@@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -101,16 +103,14 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
final String partition = "persistent://" + namespaceName +
"/test-partitioned-topi-auto-creation-partition-0";
- producer = pulsarClient.newProducer()
- .topic(partition)
- .create();
-
- partitionedTopics =
admin.topics().getPartitionedTopicList(namespaceName);
- topics = admin.topics().getList(namespaceName);
- assertEquals(partitionedTopics.size(), 0);
- assertEquals(topics.size(), 1);
-
- producer.close();
+ // The Pulsar doesn't automatically create the metadata for the single
partition, so the producer creation
+ // will fail.
+ assertThrows(NotAllowedException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored = pulsarClient.newProducer()
+ .topic(partition)
+ .create();
+ });
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
index ed9881a8cad..3d24fe3ce38 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/PulsarClientBasedHandler.java
@@ -79,9 +79,11 @@ public class PulsarClientBasedHandler implements
ProtocolHandler {
@Override
public void start(BrokerService service) {
+ @Cleanup
+ PulsarAdmin admin = null;
try {
final var port =
service.getPulsar().getListenPortHTTP().orElseThrow();
- @Cleanup final var admin =
PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + port).build();
+ admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:" +
port).build();
try {
admin.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(service.getPulsar().getWebServiceAddress())
@@ -103,6 +105,7 @@ public class PulsarClientBasedHandler implements
ProtocolHandler {
throw new RuntimeException(e);
}
try {
+ admin.topics().createPartitionedTopic(topic, partitions);
final var port = service.getListenPort().orElseThrow();
client = PulsarClient.builder().serviceUrl("pulsar://localhost:" +
port).build();
readers = new ArrayList<>();
@@ -122,7 +125,7 @@ public class PulsarClientBasedHandler implements
ProtocolHandler {
});
}
});
- } catch (PulsarClientException e) {
+ } catch (PulsarClientException | PulsarAdminException e) {
throw new RuntimeException(e);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 5f95e557b8c..33a34d3fff4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -19,20 +19,21 @@
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import io.netty.util.HashedWheelTimer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -316,6 +317,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
public void topicDeleted(String ignored, boolean partitioned) throws
Exception {
String topic = newTopic("persistent", partitioned);
+ @Cleanup
Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.Exclusive)
@@ -329,8 +331,14 @@ public class ExclusiveProducerTest extends BrokerTestBase {
admin.topics().delete(topic, true);
}
- // The producer should be able to publish again on the topic
- p1.send("msg-2");
+ if (!partitioned) {
+ // The producer should be able to publish again on the topic
+ p1.send("msg-2");
+ } else {
+ // The partitioned topic is deleted, the producer should not be
able to publish again on the topic.
+ // Partitioned metadata is required to publish messages to the
topic.
+ assertThrows(TimeoutException.class, () -> p1.send("msg-2"));
+ }
}
@Test(dataProvider = "topics")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 92b767104f6..0863e1ec5c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -127,7 +127,6 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -141,7 +140,7 @@ import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
+import
org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore.OperationType;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@@ -1464,8 +1463,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();
// create topic
-
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .createPartitionedTopic(TopicName.get(successTopicName), new
PartitionedTopicMetadata(2));
PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
@@ -1477,8 +1474,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertFalse((boolean) isClosingOrDeletingField.get(topic));
metadataStore.failConditional(new MetadataStoreException("injected
error"), (op, path) ->
- op == FaultInjectionMetadataStore.OperationType.PUT &&
-
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"));
+ op == OperationType.EXISTS &&
path.equals("/admin/flags/policies-readonly"));
try {
topic.delete().get();
fail();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d1d7358f346..60932e09116 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -71,7 +71,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
+import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
@@ -1208,7 +1208,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
if (!isPartitionedTopic) {
fail("Topic creation should not fail without any partitioned
topic");
}
- assertTrue(e.getCause() instanceof NamingException);
+ assertTrue(e.getCause() instanceof NotAllowedException);
}
// non-persistent topic test
@@ -1221,7 +1221,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
if (!isPartitionedTopic) {
fail("Topic creation should not fail without any partitioned
topic");
}
- assertTrue(e.getCause() instanceof NamingException);
+ assertTrue(e.getCause() instanceof NotAllowedException);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index e0d6a432bda..a654b477b45 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.UUID;
@@ -43,11 +47,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
-
@Test(groups = "broker")
public class NonPersistentTopicTest extends BrokerTestBase {
@@ -113,19 +112,16 @@ public class NonPersistentTopicTest extends
BrokerTestBase {
}
@Test
- public void testCreateNonExistentPartitions() throws PulsarAdminException,
PulsarClientException {
+ public void testCreateNonExistentPartitions() throws PulsarAdminException {
final String topicName =
"non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
admin.topics().createPartitionedTopic(topicName, 4);
TopicName partition = TopicName.get(topicName).getPartition(4);
- try {
+ assertThrows(PulsarClientException.NotAllowedException.class, () -> {
@Cleanup
- Producer<byte[]> producer = pulsarClient.newProducer()
+ Producer<byte[]> ignored = pulsarClient.newProducer()
.topic(partition.toString())
.create();
- fail("unexpected behaviour");
- } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
- }
+ });
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 3e74ab5e1ac..12b9b0568b7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -36,6 +34,8 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -578,37 +578,6 @@ public class PersistentTopicTest extends BrokerTestBase {
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}
- @Test
- public void testCompatibilityWithPartitionKeyword() throws
PulsarAdminException, PulsarClientException {
- final String topicName =
"persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword";
- TopicName topicNameEntity = TopicName.get(topicName);
- String partition2 = topicNameEntity.getPartition(2).toString();
- // Create a non-partitioned topic with -partition- keyword
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(partition2)
- .create();
- List<String> topics = admin.topics().getList("prop/ns-abc");
- // Close previous producer to simulate reconnect
- producer.close();
- // Disable auto topic creation
- conf.setAllowAutoTopicCreation(false);
- // Check the topic exist in the list.
- Assert.assertTrue(topics.contains(partition2));
- // Check this topic has no partition metadata.
- Assert.assertThrows(PulsarAdminException.NotFoundException.class,
- () -> admin.topics().getPartitionedTopicMetadata(topicName));
- // Reconnect to the broker and expect successful because the topic has
existed in the broker.
- producer = pulsarClient.newProducer()
- .topic(partition2)
- .create();
- producer.close();
- // Check the topic exist in the list again.
- Assert.assertTrue(topics.contains(partition2));
- // Check this topic has no partition metadata again.
- Assert.assertThrows(PulsarAdminException.NotFoundException.class,
- () -> admin.topics().getPartitionedTopicMetadata(topicName));
- }
-
@Test
public void testDeleteTopicFail() throws Exception {
final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_"
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
new file mode 100644
index 00000000000..a81dbe02b34
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertThrows;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ConsumerCreationTest extends ProducerConsumerBase {
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "topicDomainProvider")
+ public Object[][] topicDomainProvider() {
+ return new Object[][]{
+ {TopicDomain.persistent},
+ {TopicDomain.non_persistent}
+ };
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void testCreateConsumerWhenTopicTypeMismatch(TopicDomain domain)
+ throws PulsarAdminException, PulsarClientException {
+ String nonPartitionedTopic =
+ TopicName.get(domain.value(), "public", "default",
+
"testCreateConsumerWhenTopicTypeMismatch-nonPartitionedTopic")
+ .toString();
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
+ // Topic type is non-partitioned, Trying to create consumer on
partitioned topic.
+ assertThrows(NotAllowedException.class, () -> {
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
+ .subscriptionName("my-sub").subscribe();
+ });
+
+ // Topic type is partitioned, Trying to create consumer on
non-partitioned topic.
+ String partitionedTopic = TopicName.get(domain.value(), "public",
"default",
+
"testCreateConsumerWhenTopicTypeMismatch-partitionedTopic")
+ .toString();
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
+ // Works fine because the lookup can help our to find the correct
topic.
+ {
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(2).toString())
+ .subscriptionName("my-sub").subscribe();
+ }
+
+ // Partition index is out of range.
+ assertThrows(NotAllowedException.class, () -> {
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
+ .subscriptionName("my-sub").subscribe();
+ });
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain
domain)
+ throws PulsarAdminException, PulsarClientException {
+ testCreateConsumerWhenSinglePartitionIsDeleted(domain, false);
+ testCreateConsumerWhenSinglePartitionIsDeleted(domain, true);
+ }
+
+ private void testCreateConsumerWhenSinglePartitionIsDeleted(TopicDomain
domain, boolean allowAutoTopicCreation)
+ throws PulsarAdminException, PulsarClientException {
+ conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
+
+ String partitionedTopic = TopicName.get(domain.value(), "public",
"default",
+ "testCreateConsumerWhenSinglePartitionIsDeleted-" +
allowAutoTopicCreation)
+ .toString();
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
+
+ // Non-persistent topic only have the metadata, and no partition, so
it works fine.
+ if (allowAutoTopicCreation ||
domain.equals(TopicDomain.non_persistent)) {
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
+ } else {
+
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
+ @Cleanup
+ Consumer<byte[]> ignored =
+
pulsarClient.newConsumer().topic(partitionedTopic).subscriptionName("my-sub").subscribe();
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index d5734588288..cd75383a487 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.client.api;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
+import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException.NotAllowedException;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -191,4 +194,74 @@ public class ProducerCreationTest extends
ProducerConsumerBase {
Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
}
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void testCreateProducerWhenTopicTypeMismatch(TopicDomain domain)
+ throws PulsarAdminException, PulsarClientException {
+ String nonPartitionedTopic =
+ TopicName.get(domain.value(), "public", "default",
+
"testCreateProducerWhenTopicTypeMismatch-nonPartitionedTopic")
+ .toString();
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
+ // Topic type is non-partitioned, trying to create producer on the
complete partitioned topic.
+ // Should throw NotAllowedException.
+ assertThrows(NotAllowedException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored =
+
pulsarClient.newProducer().topic(TopicName.get(nonPartitionedTopic).getPartition(2).toString())
+ .create();
+ });
+
+ // Topic type is partitioned, trying to create producer on the base
partitioned topic.
+ String partitionedTopic = TopicName.get(domain.value(), "public",
"default",
+
"testCreateProducerWhenTopicTypeMismatch-partitionedTopic")
+ .toString();
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
+ // Works fine because the lookup can help our to find all the topics.
+ {
+ @Cleanup
+ Producer<byte[]> ignored =
+
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartitionedTopicName())
+ .create();
+ }
+
+ // Partition index is out of range.
+ assertThrows(NotAllowedException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored =
+
pulsarClient.newProducer().topic(TopicName.get(partitionedTopic).getPartition(100).toString())
+ .create();
+ });
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain
domain)
+ throws PulsarAdminException, PulsarClientException {
+ testCreateProducerWhenSinglePartitionIsDeleted(domain, false);
+ testCreateProducerWhenSinglePartitionIsDeleted(domain, true);
+ }
+
+ private void testCreateProducerWhenSinglePartitionIsDeleted(TopicDomain
domain, boolean allowAutoTopicCreation)
+ throws PulsarAdminException, PulsarClientException {
+ conf.setAllowAutoTopicCreation(allowAutoTopicCreation);
+
+ String partitionedTopic = TopicName.get(domain.value(), "public",
"default",
+ "testCreateProducerWhenSinglePartitionIsDeleted-" +
allowAutoTopicCreation)
+ .toString();
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
admin.topics().delete(TopicName.get(partitionedTopic).getPartition(1).toString());
+
+ // Non-persistent topic only have the metadata, and no partition, so
it works fine.
+ if (allowAutoTopicCreation || domain == TopicDomain.non_persistent) {
+ @Cleanup
+ Producer<byte[]> ignored =
pulsarClient.newProducer().topic(partitionedTopic).create();
+ } else {
+
assertThrows(PulsarClientException.TopicDoesNotExistException.class, () -> {
+ @Cleanup
+ Producer<byte[]> ignored =
pulsarClient.newProducer().topic(partitionedTopic).create();
+ });
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
index 59cb7ae03d0..c4ef53b292b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java
@@ -19,13 +19,19 @@
package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.AfterClass;
@@ -125,4 +131,80 @@ public class LookupServiceTest extends
ProducerConsumerBase {
admin.topics().delete(nonPartitionedTopic, false);
}
+ @Test(dataProvider = "isUsingHttpLookup")
+ public void testGetPartitionedTopicMetadataByPulsarClient(boolean
isUsingHttpLookup) throws PulsarAdminException {
+ LookupService lookupService = getLookupService(isUsingHttpLookup);
+
+ // metadataAutoCreationEnabled is true.
+ assertThat(lookupService.getPartitionedTopicMetadata(
+
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")),
true))
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(n -> n.partitions == 0);
+
+ // metadataAutoCreationEnabled is true.
+ // Allow the get the metadata of single partition topic, because the
auto-creation is enabled.
+ // But the producer/consumer is unavailable because the topic doesn't
have the metadata.
+ assertThat(lookupService.getPartitionedTopicMetadata(
+
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") +
"-partition-10"),
+ true))
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(n -> n.partitions == 0);
+
+ Class<? extends Throwable> expectedExceptionClass =
+ isUsingHttpLookup ?
PulsarClientException.NotFoundException.class :
+ PulsarClientException.TopicDoesNotExistException.class;
+ // metadataAutoCreationEnabled is false.
+ assertThat(lookupService.getPartitionedTopicMetadata(
+
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp")),
false))
+ .failsWithin(3, TimeUnit.SECONDS)
+ .withThrowableThat()
+ .withCauseInstanceOf(expectedExceptionClass);
+
+ // metadataAutoCreationEnabled is false.
+ assertThat(lookupService.getPartitionedTopicMetadata(
+
TopicName.get(BrokerTestUtil.newUniqueName("persistent://public/default/tp") +
"-partition-10"),
+ false))
+ .failsWithin(3, TimeUnit.SECONDS)
+ .withThrowableThat()
+ .withCauseInstanceOf(expectedExceptionClass);
+
+ // Verify the topic exists, and the metadataAutoCreationEnabled is
false.
+ String nonPartitionedTopic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(nonPartitionedTopic),
false))
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(n -> n.partitions == 0);
+
+ String partitionedTopic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ String partitionedTopicWithPartitionIndex = partitionedTopic +
"-partition-10";
+ admin.topics().createPartitionedTopic(partitionedTopic, 20);
+
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopic),
false))
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(n -> n.partitions == 20);
+
assertThat(lookupService.getPartitionedTopicMetadata(TopicName.get(partitionedTopicWithPartitionIndex),
false))
+ .succeedsWithin(3, TimeUnit.SECONDS)
+ .matches(n -> n.partitions == 0);
+ }
+
+ @Test
+ public void testGetPartitionedTopicMedataByAdmin() throws
PulsarAdminException {
+ String nonPartitionedTopic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ String partitionedTopic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ String partitionedTopicWithPartitionIndex = partitionedTopic +
"-partition-10";
+ // No topic, so throw the NotFound.
+ // BTW: The admin api doesn't allow to creat the metadata of topic
default.
+ assertThrows(PulsarAdminException.NotFoundException.class, () ->
admin.topics()
+ .getPartitionedTopicMetadata(nonPartitionedTopic));
+ assertThrows(PulsarAdminException.NotFoundException.class, () ->
admin.topics()
+ .getPartitionedTopicMetadata(partitionedTopic));
+ assertThrows(PulsarAdminException.NotFoundException.class,
+ () ->
admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex));
+
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(nonPartitionedTopic).partitions,
0);
+
+ admin.topics().createPartitionedTopic(partitionedTopic, 20);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopic).partitions,
20);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicWithPartitionIndex).partitions,
0);
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b2c9b2b697b..32cec950a69 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -961,7 +961,9 @@ public class PulsarClientException extends IOException {
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
// wrap an exception with new message info
- if (t instanceof TimeoutException) {
+ if (t instanceof TopicDoesNotExistException) {
+ return new TopicDoesNotExistException(msg);
+ } else if (t instanceof TimeoutException) {
return new TimeoutException(msg);
} else if (t instanceof InvalidConfigurationException) {
return new InvalidConfigurationException(msg);