This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e340c  Fix partitioned topic update for admin v2 api. (#1376)
e7e340c is described below

commit e7e340c479f88f2696a4d655776d0af4f601a3ab
Author: cckellogg <cckell...@gmail.com>
AuthorDate: Tue Mar 13 14:26:28 2018 -0700

    Fix partitioned topic update for admin v2 api. (#1376)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 +++++++-
 .../apache/pulsar/broker/admin/ZkAdminPaths.java   | 43 ++++++++++++++++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 17 ++++-----
 3 files changed, 64 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1e77640..31d13c5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -20,11 +20,11 @@ package org.apache.pulsar.broker.admin;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -454,4 +454,17 @@ public abstract class AdminResource extends 
PulsarWebResource {
             throw new RestException(e);
         }
     }
+
+    protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
+        try {
+            final Policies policies = 
policiesCache().get(ZkAdminPaths.namespacePoliciesPath(namespaceName))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
+            return policies.replication_clusters.size() > 1;
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get namespace policies {}", 
clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
new file mode 100644
index 0000000..c60422d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class ZkAdminPaths {
+
+    public static final String POLICIES = "policies";
+    public static final String PARTITIONED_TOPIC_PATH_ZNODE = 
"partitioned-topics";
+
+    public static String partitionedTopicPath(TopicName name) {
+        return adminPath(PARTITIONED_TOPIC_PATH_ZNODE,
+                name.getNamespace(), name.getDomain().value(), 
name.getEncodedLocalName());
+    }
+
+    public static String namespacePoliciesPath(NamespaceName name) {
+        return adminPath(POLICIES, name.toString());
+    }
+
+    private static String adminPath(String... parts) {
+        return "/admin/" + String.join("/", parts);
+    }
+
+    private ZkAdminPaths() {}
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9386df5..fed1da6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -24,11 +24,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +50,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -89,6 +86,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -335,8 +333,7 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of 
partitions should be more than 1");
         }
         try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
+            String path = ZkAdminPaths.partitionedTopicPath(topicName);
             byte[] data = jsonMapper().writeValueAsBytes(new 
PartitionedTopicMetadata(numPartitions));
             zkCreateOptimistic(path, data);
             // we wait for the data to be synced in all quorums and the 
observers
@@ -364,7 +361,8 @@ public class PersistentTopicsBase extends AdminResource {
      */
     protected void internalUpdatePartitionedTopic(int numPartitions) {
         validateAdminAccessOnProperty(topicName.getProperty());
-        if (topicName.isGlobal()) {
+
+        if (topicName.isGlobal() && 
isNamespaceReplicated(topicName.getNamespaceObject())) {
             log.error("[{}] Update partitioned-topic is forbidden on global 
namespace {}", clientAppId(),
                     topicName);
             throw new RestException(Status.FORBIDDEN, "Update forbidden on 
global namespace");
@@ -1164,8 +1162,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions) {
-        String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
topicName.getProperty(), topicName.getCluster(), 
topicName.getNamespacePortion(),
-                domain(), topicName.getEncodedLocalName());
+        final String path = ZkAdminPaths.partitionedTopicPath(topicName);
 
         CompletableFuture<Void> updatePartition = new CompletableFuture<>();
         createSubscriptions(topicName, numPartitions).thenAccept(res -> {

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to