This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2a1078a5cbd4ff2e48c89ecea9e55b6a8f8229b1 Author: Aloys <lofterzh...@gmail.com> AuthorDate: Fri Jun 18 14:09:06 2021 +0800 fix non-persistent topic get partitioned metadata error on discovery (#10806) Fixes #10443 fix non-persistent topic get partitioned metadata error if using discovery (cherry picked from commit 859922942759aaa539fe7b0951a614bb75c71ea8) --- .../discovery/service/BrokerDiscoveryProvider.java | 2 +- .../discovery/service/BaseDiscoveryTestSetup.java | 8 ++++++++ .../discovery/service/DiscoveryServiceTest.java | 21 +++++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index 414d2ce..41374f2 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -108,7 +108,7 @@ public class BrokerDiscoveryProvider implements Closeable { try { checkAuthorization(service, topicName, role, authenticationData); final String path = path(PARTITIONED_TOPIC_PATH_ZNODE, - topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName()); + topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName()); // gets the number of partitions from the zk cache globalZkCache .getDataAsync(path, diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java index 3c7b3ff..074c8fb 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java @@ -32,9 +32,11 @@ import org.apache.pulsar.discovery.service.server.ServiceConfig; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.Code; public class BaseDiscoveryTestSetup { @@ -85,4 +87,10 @@ public class BaseDiscoveryTestSetup { } }; + protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) { + mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> { + return op == MockZooKeeper.Op.GET + && path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2"); + }); + } } diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java index b30c66d..6498ee8 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java @@ -122,6 +122,27 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { } } + @Test + public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception { + TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1"); + + PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null) + .get(); + assertEquals(m.partitions, 0); + + // Simulate ZK error + simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED); + TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2"); + CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider() + .getPartitionedTopicMetadata(service, topic2, "role", null); + try { + future.get(); + fail("Partition metadata lookup should have failed"); + } catch (ExecutionException e) { + assertEquals(e.getCause().getClass(), MetadataStoreException.class); + } + } + /** * It verifies: client connects to Discovery-service and receives discovery response successfully. *