This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 05488a0  Issue 1067: Problems with Partitioned Topics which name 
contains -partition-N (#2342)
05488a0 is described below

commit 05488a0be6e27292344a093f694d2a5f48886601
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Fri Aug 10 19:40:44 2018 -0700

    Issue 1067: Problems with Partitioned Topics which name contains 
-partition-N (#2342)
    
    * Issue 1067: Problems with Partitioned Topics which name contains 
-partition-N
    
     ### Motivation
    
    Fixes #1067.
    
    Someone accidentally created a partitioned topic in one of our cluster with 
a name which contains -partition-2.
    This raised all sorts of issues, it would seem that only one partition was 
created for this topic, but metadata exists saying that it has 10 partitions.
    
     ### Changes
    
    Disallow creating a partitioned topic contains '-partition-' in the name.
    
    * only validate partitioned topic name when create/delete/update 
partitioned topics
    
    * Changed to use a CONSTANT
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 +++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 42 +++++++++----
 .../pulsar/broker/admin/AdminResourceTest.java     | 69 ++++++++++++++++++++++
 .../org/apache/pulsar/common/naming/TopicName.java |  2 +-
 4 files changed, 108 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index be1b7dd..de7dd0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -254,6 +254,15 @@ public abstract class AdminResource extends 
PulsarWebResource {
         this.topicName = TopicName.get(domain(), namespaceName, topic);
     }
 
+    protected void validatePartitionedTopicName(String tenant, String 
namespace, String encodedTopic) {
+        // first, it has to be a validate topic name
+        validateTopicName(tenant, namespace, encodedTopic);
+        // second, "-partition-" is not allowed
+        if (encodedTopic.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Partitioned 
Topic Name should not contain '-partition-'");
+        }
+    }
+
     @Deprecated
     protected void validateTopicName(String property, String cluster, String 
namespace, String encodedTopic) {
         String topic = Codec.decode(encodedTopic);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6a246b8..771a057 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -124,12 +124,15 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Create a partitioned topic.", notes = "It needs to 
be called before creating a producer on a partitioned topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 409, message = "Partitioned topic already 
exist") })
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 409, message = "Partitioned topic already exist"),
+        @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+    })
     public void createPartitionedTopic(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int 
numPartitions,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
+        validatePartitionedTopicName(tenant, namespace, encodedTopic);
         internalCreatePartitionedTopic(numPartitions, authoritative);
     }
 
@@ -151,18 +154,25 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @POST
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Increment partitons of an existing partitioned 
topic.", notes = "It only increments partitions of existing non-global 
partitioned-topic")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 409, message = "Partitioned topic does not 
exist") })
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 409, message = "Partitioned topic does not exist"),
+        @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+    })
     public void updatePartitionedTopic(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, int 
numPartitions) {
-        validateTopicName(tenant, namespace, encodedTopic);
+        validatePartitionedTopicName(tenant, namespace, encodedTopic);
         internalUpdatePartitionedTopic(numPartitions);
     }
 
     @GET
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Get partitioned topic metadata.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 409, message = "Partitioned topic does not exist"),
+        @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+    })
     public PartitionedTopicMetadata 
getPartitionedMetadata(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
@@ -173,13 +183,16 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/partitions")
     @ApiOperation(value = "Delete a partitioned topic.", notes = "It will also 
delete all the partitions of the topic if it exists.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 404, message = "Partitioned topic does not 
exist") })
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 404, message = "Partitioned topic does not exist"),
+        @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+    })
     public void deletePartitionedTopic(@PathParam("tenant") String tenant, 
@PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
+        validatePartitionedTopicName(tenant, namespace, encodedTopic);
         internalDeletePartitionedTopic(authoritative, force);
     }
 
@@ -259,12 +272,15 @@ public class PersistentTopics extends 
PersistentTopicsBase {
     @GET
     @Path("{tenant}/{namespace}/{topic}/partitioned-stats")
     @ApiOperation(value = "Get the stats for the partitioned topic.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 404, message = "Topic does not exist") })
+    @ApiResponses(value = {
+        @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
+    })
     public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") 
String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
+        validatePartitionedTopicName(tenant, namespace, encodedTopic);
         return internalGetPartitionedStats(authoritative);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
new file mode 100644
index 0000000..7ad60c2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import javax.ws.rs.core.Response.Status;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.util.Codec;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link AdminResource}.
+ */
+public class AdminResourceTest {
+
+    private static AdminResource mockResource() {
+        return new AdminResource() {
+
+            @Override
+            protected String domain() {
+                return "persistent";
+            }
+        };
+    }
+
+    @Test
+    public void testValidatePartitionedTopicNameSuccess() {
+        String tenant = "test-tenant";
+        String namespace = "test-namespace";
+        String topic = Codec.encode("test-topic");
+
+        AdminResource resource = mockResource();
+        resource.validatePartitionedTopicName(tenant, namespace, topic);
+    }
+
+    @Test
+    public void testValidatePartitionedTopicNameInvalid() {
+        String tenant = "test-tenant";
+        String namespace = "test-namespace";
+        String topic = Codec.encode("test-topic-partition-0");
+
+        AdminResource resource = mockResource();
+        try {
+            resource.validatePartitionedTopicName(tenant, namespace, topic);
+            fail("Should fail validation on invalid partitioned topic");
+        } catch (RestException re) {
+            assertEquals(Status.PRECONDITION_FAILED.getStatusCode(), 
re.getResponse().getStatus());
+        }
+    }
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index ef45598..3d43271 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -44,7 +44,7 @@ public class TopicName implements ServiceUnitId {
     public static final String PUBLIC_TENANT = "public";
     public static final String DEFAULT_NAMESPACE = "default";
 
-    private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
+    public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
 
     private final String completeTopicName;
 

Reply via email to