This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e7e340c Fix partitioned topic update for admin v2 api. (#1376) e7e340c is described below commit e7e340c479f88f2696a4d655776d0af4f601a3ab Author: cckellogg <cckell...@gmail.com> AuthorDate: Tue Mar 13 14:26:28 2018 -0700 Fix partitioned topic update for admin v2 api. (#1376) --- .../apache/pulsar/broker/admin/AdminResource.java | 15 +++++++- .../apache/pulsar/broker/admin/ZkAdminPaths.java | 43 ++++++++++++++++++++++ .../broker/admin/impl/PersistentTopicsBase.java | 17 ++++----- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1e77640..31d13c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -20,11 +20,11 @@ package org.apache.pulsar.broker.admin; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import java.net.MalformedURLException; import java.net.URI; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -454,4 +454,17 @@ public abstract class AdminResource extends PulsarWebResource { throw new RestException(e); } } + + protected boolean isNamespaceReplicated(NamespaceName namespaceName) { + try { + final Policies policies = policiesCache().get(ZkAdminPaths.namespacePoliciesPath(namespaceName)) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist")); + return policies.replication_clusters.size() > 1; + } catch (RestException re) { + throw re; + } catch (Exception e) { + log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e); + throw new RestException(e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java new file mode 100644 index 0000000..c60422d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +public class ZkAdminPaths { + + public static final String POLICIES = "policies"; + public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; + + public static String partitionedTopicPath(TopicName name) { + return adminPath(PARTITIONED_TOPIC_PATH_ZNODE, + name.getNamespace(), name.getDomain().value(), name.getEncodedLocalName()); + } + + public static String namespacePoliciesPath(NamespaceName name) { + return adminPath(POLICIES, name.toString()); + } + + private static String adminPath(String... parts) { + return "/admin/" + String.join("/", parts); + } + + private ZkAdminPaths() {} +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9386df5..fed1da6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -24,11 +24,7 @@ import static org.apache.pulsar.common.util.Codec.decode; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -54,6 +50,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -89,6 +86,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -335,8 +333,7 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1"); } try { - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), - topicName.getEncodedLocalName()); + String path = ZkAdminPaths.partitionedTopicPath(topicName); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); // we wait for the data to be synced in all quorums and the observers @@ -364,7 +361,8 @@ public class PersistentTopicsBase extends AdminResource { */ protected void internalUpdatePartitionedTopic(int numPartitions) { validateAdminAccessOnProperty(topicName.getProperty()); - if (topicName.isGlobal()) { + + if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", clientAppId(), topicName); throw new RestException(Status.FORBIDDEN, "Update forbidden on global namespace"); @@ -1164,8 +1162,7 @@ public class PersistentTopicsBase extends AdminResource { } private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions) { - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getProperty(), topicName.getCluster(), topicName.getNamespacePortion(), - domain(), topicName.getEncodedLocalName()); + final String path = ZkAdminPaths.partitionedTopicPath(topicName); CompletableFuture<Void> updatePartition = new CompletableFuture<>(); createSubscriptions(topicName, numPartitions).thenAccept(res -> { -- To stop receiving notification emails like this one, please contact mme...@apache.org.