This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6d86b0aa02345e4057aff86c1d970fd2e08848cf Author: lipenghui <[email protected]> AuthorDate: Mon May 16 21:04:27 2022 +0800 [improve][admin] add topic name and sub name for NotFound error message (#15606) To fix https://issues.apache.org/jira/browse/PULSAR-20 (cherry picked from commit f89698c11af3429ee69c3a15a8ef729650b6a705) --- .../apache/pulsar/broker/admin/AdminResource.java | 8 ++ .../broker/admin/impl/PersistentTopicsBase.java | 109 +++++++++++++-------- .../apache/pulsar/broker/admin/AdminApi2Test.java | 2 + .../broker/admin/AdminApiSubscriptionTest.java | 12 +-- .../apache/pulsar/broker/admin/AdminApiTest.java | 31 +++++- .../pulsar/broker/admin/PersistentTopicsTest.java | 3 +- .../org/apache/pulsar/broker/admin/TopicsTest.java | 4 +- 7 files changed, 119 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 6b964fafe7e..08ca6c7f23e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -839,4 +839,12 @@ public abstract class AdminResource extends PulsarWebResource { protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } + + protected static String getPartitionedTopicNotFoundErrorMessage(String topic) { + return String.format("Partitioned Topic %s not found", topic); + } + + protected static String getSubNotFoundErrorMessage(String topic, String subscription) { + return String.format("Subscription %s not found for topic %s", subscription, topic); + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 956679df95b..196586fec96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -591,7 +591,7 @@ public class PersistentTopicsBase extends AdminResource { return pulsar().getNamespaceService().checkTopicExists(topicName) .thenAccept(exist -> { if (!exist) { - throw new RestException(Status.NOT_FOUND, "Topic not exist"); + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); } }); } @@ -664,7 +664,8 @@ public class PersistentTopicsBase extends AdminResource { } else if (realCause instanceof MetadataStoreException.NotFoundException) { log.warn("Namespace policies of {} not found", topicName.getNamespaceObject()); asyncResponse.resume(new RestException( - new RestException(Status.NOT_FOUND, "Partitioned topic does not exist"))); + new RestException(Status.NOT_FOUND, + getPartitionedTopicNotFoundErrorMessage(topicName.toString())))); } else if (realCause instanceof PulsarAdminException) { asyncResponse.resume(new RestException((PulsarAdminException) realCause)); } else if (realCause instanceof MetadataStoreException.BadVersionException) { @@ -1080,7 +1081,7 @@ public class PersistentTopicsBase extends AdminResource { if (t instanceof TopicBusyException) { throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions"); } else if (isManagedLedgerNotFoundException(e)) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); } else { throw new RestException(t); } @@ -1306,7 +1307,8 @@ public class PersistentTopicsBase extends AdminResource { if (exception != null) { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); } else { log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t); asyncResponse.resume(new RestException(t)); @@ -1378,7 +1380,8 @@ public class PersistentTopicsBase extends AdminResource { future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)).thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getPartitionedTopicNotFoundErrorMessage(topicName.toString()))); return; } PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata); @@ -1452,7 +1455,8 @@ public class PersistentTopicsBase extends AdminResource { future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) .thenAccept(partitionMetadata -> { if (partitionMetadata.partitions == 0) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getPartitionedTopicNotFoundErrorMessage(topicName.toString()))); return; } @@ -1540,7 +1544,8 @@ public class PersistentTopicsBase extends AdminResource { if (exception != null) { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return null; } else if (t instanceof PreconditionFailedException) { asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, @@ -1586,7 +1591,8 @@ public class PersistentTopicsBase extends AdminResource { Topic topic = getTopicReference(topicName); Subscription sub = topic.getSubscription(subName); if (sub == null) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); + throw new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName)); } return sub.delete(); }).thenRun(() -> { @@ -1701,7 +1707,8 @@ public class PersistentTopicsBase extends AdminResource { if (exception != null) { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return null; } else { log.error("[{}] Failed to delete subscription forcefully {} {}", @@ -1747,7 +1754,8 @@ public class PersistentTopicsBase extends AdminResource { Topic topic = getTopicReference(topicName); Subscription sub = topic.getSubscription(subName); if (sub == null) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); + throw new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName)); } return sub.deleteForcefully(); }).thenRun(() -> { @@ -1805,7 +1813,8 @@ public class PersistentTopicsBase extends AdminResource { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { asyncResponse.resume( - new RestException(Status.NOT_FOUND, "Subscription not found")); + new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); } else { log.error("[{}] Failed to skip all messages {} {}", clientAppId(), topicName, subName, t); @@ -1853,14 +1862,16 @@ public class PersistentTopicsBase extends AdminResource { PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return CompletableFuture.completedFuture(null); } return repl.clearBacklog().whenComplete(biConsumer); } else { PersistentSubscription sub = topic.getSubscription(subName); if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return CompletableFuture.completedFuture(null); } return sub.clearBacklog().whenComplete(biConsumer); @@ -1896,7 +1907,8 @@ public class PersistentTopicsBase extends AdminResource { return getTopicReferenceAsync(topicName).thenCompose(t -> { PersistentTopic topic = (PersistentTopic) t; if (topic == null) { - throw new RestException(new RestException(Status.NOT_FOUND, "Topic not found")); + throw new RestException(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); } if (subName.startsWith(topic.getReplicatorPrefix())) { String remoteCluster = PersistentReplicator.getRemoteCluster(subName); @@ -1916,7 +1928,8 @@ public class PersistentTopicsBase extends AdminResource { PersistentSubscription sub = topic.getSubscription(subName); if (sub == null) { return FutureUtil.failedFuture( - new RestException(Status.NOT_FOUND, "Subscription not found")); + new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); } return sub.skipMessages(numMessages).thenAccept(unused -> { log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, @@ -2017,7 +2030,7 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> { if (t == null) { resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND, - "Topic not found")); + getTopicNotFoundErrorMessage(topicName.toString()))); return; } if (!(t instanceof PersistentTopic)) { @@ -2163,7 +2176,7 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(topic -> { Subscription sub = topic.getSubscription(subName); if (sub == null) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); } return sub.resetCursor(timestamp); }) @@ -2524,12 +2537,14 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(ignore -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName); if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>(); @@ -3094,7 +3109,7 @@ public class PersistentTopicsBase extends AdminResource { messageId.getEntryId()); if (topic == null) { asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Topic not found")); + getTopicNotFoundErrorMessage(topicName.toString()))); return; } ManagedLedgerImpl managedLedger = @@ -3492,7 +3507,7 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(__ -> checkTopicExistsAsync(topicName)) .thenCompose(exist -> { if (!exist) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); } else { return getPartitionedTopicMetadataAsync(topicName, false, false) .thenCompose(metadata -> { @@ -3624,7 +3639,8 @@ public class PersistentTopicsBase extends AdminResource { if (exception != null) { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); } else { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t); asyncResponse.resume(new RestException(t)); @@ -3700,7 +3716,8 @@ public class PersistentTopicsBase extends AdminResource { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); + getSubNotFoundErrorMessage(topicName.toString(), + subName))); return null; } else { log.error("[{}] Failed to expire messages up " @@ -3743,7 +3760,8 @@ public class PersistentTopicsBase extends AdminResource { final CompletableFuture<Void> resultFuture = new CompletableFuture<>(); getTopicReferenceAsync(topicName).thenAccept(t -> { if (t == null) { - resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found")); + resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } if (!(t instanceof PersistentTopic)) { @@ -3768,7 +3786,8 @@ public class PersistentTopicsBase extends AdminResource { PersistentSubscription sub = topic.getSubscription(subName); if (sub == null) { resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Subscription not found")); + new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } issued = sub.expireMessages(expireTimeInSeconds); @@ -3852,14 +3871,15 @@ public class PersistentTopicsBase extends AdminResource { return getTopicReferenceAsync(topicName).thenAccept(t -> { PersistentTopic topic = (PersistentTopic) t; if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } try { PersistentSubscription sub = topic.getSubscription(subName); if (sub == null) { asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>(); @@ -4188,7 +4208,7 @@ public class PersistentTopicsBase extends AdminResource { private RestException topicNotFoundReason(TopicName topicName) { if (!topicName.isPartitioned()) { - return new RestException(Status.NOT_FOUND, "Topic not found"); + return new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); } PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata( @@ -4201,12 +4221,13 @@ public class PersistentTopicsBase extends AdminResource { } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) { return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); } - return new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); + return new RestException(Status.NOT_FOUND, getPartitionedTopicNotFoundErrorMessage(topicName.toString())); } private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) { if (!topicName.isPartitioned()) { - return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Topic not found")); + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); } return getPartitionedTopicMetadataAsync( @@ -4220,7 +4241,8 @@ public class PersistentTopicsBase extends AdminResource { } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) { throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created"); } - throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); + throw new RestException(Status.NOT_FOUND, + getPartitionedTopicNotFoundErrorMessage(topicName.toString())); }); } @@ -4242,7 +4264,7 @@ public class PersistentTopicsBase extends AdminResource { return checkNotNull(sub); } catch (Exception e) { - throw new RestException(Status.NOT_FOUND, "Subscription not found"); + throw new RestException(Status.NOT_FOUND, getSubNotFoundErrorMessage(topicName.toString(), subName)); } } @@ -4520,7 +4542,8 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } if (!(topic instanceof PersistentTopic)) { @@ -4979,13 +5002,15 @@ public class PersistentTopicsBase extends AdminResource { .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } Subscription sub = topic.getSubscription(subName); if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } @@ -5116,15 +5141,17 @@ public class PersistentTopicsBase extends AdminResource { Topic topic = getTopicReference(topicName); if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found")); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); return; } - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); - return; - } + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return; + } if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) { Map res = Maps.newHashMap(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 3cb67eada5e..8399074886d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -585,6 +585,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId); fail("It should have failed due to invalid topic name"); } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains(topicName)); // Ok } @@ -593,6 +594,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { admin.topics().resetCursor(topicName, "invalid-sub", messageId); fail("It should have failed due to invalid subscription name"); } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains("invalid-sub")); // Ok } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java index 521ef4df1c8..411565521d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java @@ -72,18 +72,18 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { @Test public void testExpireMessageWithNonExistTopicAndNonExistSub() { String uuid = UUID.randomUUID().toString(); - String topic = "test-expire-messages-non-exist-topic-" + uuid; + String topic = "persistent://public/default/test-expire-messages-non-exist-topic-" + uuid; String subscriptionName = "test-expire-messages-non-exist-sub-" + uuid; PulsarAdminException exception = expectThrows(PulsarAdminException.class, () -> admin.topics().expireMessages(topic, subscriptionName, 1)); assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode()); - assertEquals(exception.getMessage(), "Topic not found"); + assertEquals(exception.getMessage(), String.format("Topic %s not found", topic)); exception = expectThrows(PulsarAdminException.class, () -> admin.topics().expireMessagesForAllSubscriptions(topic, 1)); assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode()); - assertEquals(exception.getMessage(), "Topic not found"); + assertEquals(exception.getMessage(), String.format("Topic %s not found", topic)); } @Test @@ -100,18 +100,18 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest { @Test public void tesSkipMessageWithNonExistTopicAndNotExistSub() { String uuid = UUID.randomUUID().toString(); - String topic = "test-skip-messages-non-exist-topic-" + uuid; + String topic = "persistent://public/default/test-skip-messages-non-exist-topic-" + uuid; String subscriptionName = "test-skip-messages-non-exist-sub-" + uuid; PulsarAdminException exception = expectThrows(PulsarAdminException.class, () -> admin.topics().skipMessages(topic, subscriptionName, 1)); assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode()); - assertEquals(exception.getMessage(), "Topic not found"); + assertEquals(exception.getMessage(), String.format("Topic %s not found", topic)); exception = expectThrows(PulsarAdminException.class, () -> admin.topics().skipAllMessages(topic, subscriptionName)); assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode()); - assertEquals(exception.getMessage(), "Topic not found"); + assertEquals(exception.getMessage(), String.format("Topic %s not found", topic)); } @DataProvider(name = "partitioned") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 1c6e469935d..b2ebd2c1f6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -892,6 +892,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { try { admin.topics().skipAllMessages(persistentTopicName, subName); } catch (NotFoundException e) { + assertTrue(e.getMessage().contains(subName)); } admin.topics().delete(persistentTopicName); @@ -900,6 +901,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.topics().delete(persistentTopicName); fail("Should have received 404"); } catch (NotFoundException e) { + assertTrue(e.getMessage().contains(persistentTopicName)); } assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList()); @@ -1100,6 +1102,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(anotherTopic); fail("Should have failed as the partitioned topic was not created"); } catch (NotFoundException nfe) { + assertTrue(nfe.getMessage().contains(anotherTopic)); } admin.topics().deletePartitionedTopic(partitionedTopicName); @@ -1933,7 +1936,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.topics().getStats("persistent://prop-xyz/ns1/ghostTopic"); fail("The topic doesn't exist"); } catch (NotFoundException e) { - // OK + assertTrue(e.getMessage().contains("persistent://prop-xyz/ns1/ghostTopic")); } } @@ -2023,6 +2026,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { topicName = "persistent://prop-xyz/ns1/" + topicName; + try { + admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis()); + } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains(topicName)); + } + + admin.topics().createNonPartitionedTopic(topicName); + + try { + admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis()); + } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains("my-sub")); + } + // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").startMessageIdInclusive() @@ -2215,8 +2232,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10)); topicName = "persistent://prop-xyz/ns1/" + topicName; + try { + admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis()); + } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains(topicName)); + } + admin.topics().createPartitionedTopic(topicName, 4); + try { + admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis()); + } catch (PulsarAdminException.NotFoundException e) { + assertTrue(e.getMessage().contains("my-sub")); + } + // create consumer and subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").startMessageIdInclusive() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index bd86a666c0f..44de487113d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -172,7 +172,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic not found"); + Assert.assertEquals(errorCaptor.getValue().getMessage(), String.format("Topic %s not found", + "persistent://my-tenant/my-namespace/topic-not-found")); // 2) Confirm that the partitioned topic does not exist response = mock(AsyncResponse.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 35e205f2dc9..3327b83b6ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -383,7 +383,9 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages); ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class); verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture()); - Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not exist")); + System.out.println(responseCaptor.getValue().getMessage()); + Assert.assertTrue(responseCaptor.getValue().getMessage() + .contains(String.format("Topic %s not found", topicName))); } @Test
