This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4055b5  [Broker] Add verification when terminating non-persistent 
topic (#11903)
d4055b5 is described below

commit d4055b5877c7935b999314eafc85b80b5e219c23
Author: 包子 <wudixiaolong...@icloud.com>
AuthorDate: Tue Sep 7 00:20:19 2021 +0800

    [Broker] Add verification when terminating non-persistent topic (#11903)
    
    * [Broker] Add verification when terminating non-persistent
    
    * [Broker] add unit test
    
    * code format
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 ++++++++++
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  3 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  3 +-
 .../pulsar/broker/admin/AdminResourceTest.java     | 35 ++++++++++++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 29 ++++++++++++++++++
 5 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index de13b54..cf96e92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -241,6 +241,13 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected void validatePersistentTopicName(String property, String 
namespace, String encodedTopic) {
+        validateTopicName(property, namespace, encodedTopic);
+        if (topicName.getDomain() != TopicDomain.persistent) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a 
persistent topic name");
+        }
+    }
+
     protected void validatePartitionedTopicName(String tenant, String 
namespace, String encodedTopic) {
         // first, it has to be a validate topic name
         validateTopicName(tenant, namespace, encodedTopic);
@@ -278,6 +285,14 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    @Deprecated
+    protected void validatePersistentTopicName(String property, String 
cluster, String namespace, String encodedTopic) {
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        if (topicName.getDomain() != TopicDomain.persistent) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a 
persistent topic name");
+        }
+    }
+
     protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             final String namespace = namespaceName.toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fee8707..32f5ab3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -673,11 +673,12 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 405, message = "Operation not allowed on 
non-persistent topic"),
+            @ApiResponse(code = 406, message = "Need to provide a persistent 
topic name"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
     public MessageId terminate(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
+        validatePersistentTopicName(property, cluster, namespace, 
encodedTopic);
         return internalTerminate(authoritative);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4930915..1e26907 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2442,6 +2442,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 405, message = "Termination of a partitioned 
topic is not allowed"),
+            @ApiResponse(code = 406, message = "Need to provide a persistent 
topic name"),
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
@@ -2454,7 +2455,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
+        validatePersistentTopicName(tenant, namespace, encodedTopic);
         return internalTerminate(authoritative);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 982c0b2..e047d31 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -57,6 +57,41 @@ public class AdminResourceTest extends BrokerTestBase {
         };
     }
 
+    private static AdminResource mockNonPersistentResource() {
+        return new AdminResource() {
+
+            @Override
+            protected String domain() {
+                return "non-persistent";
+            }
+        };
+    }
+
+    @Test
+    public void testValidatePersistentTopicNameSuccess() {
+        String tenant = "test-tenant";
+        String namespace = "test-namespace";
+        String topic = Codec.encode("test-topic");
+
+        AdminResource resource = mockResource();
+        resource.validatePersistentTopicName(tenant, namespace, topic);
+    }
+
+    @Test
+    public void testValidatePersistentTopicNameInvalid() {
+        String tenant = "test-tenant";
+        String namespace = "test-namespace";
+        String topic = Codec.encode("test-topic");
+
+        AdminResource nPResource = mockNonPersistentResource();
+        try {
+            nPResource.validatePersistentTopicName(tenant, namespace, topic);
+            fail("Should fail validation on non-persistent topic");
+        } catch (RestException e) {
+            assertEquals(Status.NOT_ACCEPTABLE.getStatusCode(), 
e.getResponse().getStatus());
+        }
+    }
+
     @Test
     public void testValidatePartitionedTopicNameSuccess() {
         String tenant = "test-tenant";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 182d2cc..9871b7d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -333,6 +333,35 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testTerminate() {
+        String testLocalTopicName = "topic-not-found";
+
+        // 1) Create the nonPartitionTopic topic
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
testLocalTopicName, true);
+
+        // 2) Create a subscription
+        AsyncResponse response  = mock(AsyncResponse.class);
+        persistentTopics.createSubscription(response, testTenant, 
testNamespace, testLocalTopicName, "test", true,
+                (MessageIdImpl) MessageId.earliest, false);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+
+        // 3) Assert terminate persistent topic
+        MessageId messageId = persistentTopics.terminate(testTenant, 
testNamespace, testLocalTopicName, true);
+        Assert.assertEquals(messageId, new MessageIdImpl(3, -1, -1));
+
+        // 4) Assert terminate non-persistent topic
+        String nonPersistentTopicName = "non-persistent-topic";
+        try {
+            nonPersistentTopic.terminate(testTenant, testNamespace, 
nonPersistentTopicName, true);
+            Assert.fail("Should fail validation on non-persistent topic");
+        } catch (RestException e) {
+            
Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), 
e.getResponse().getStatus());
+        }
+    }
+
+    @Test
     public void testNonPartitionedTopics() {
         final String nonPartitionTopic = "non-partitioned-topic";
         AsyncResponse response = mock(AsyncResponse.class);

Reply via email to