This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4f1dc52 Fix lookup problem with partions in a non-persistent topics (#1251) 4f1dc52 is described below commit 4f1dc52c7eeb77222bf1b6c39074c1df624e15d3 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Feb 19 10:43:45 2018 -0800 Fix lookup problem with partions in a non-persistent topics (#1251) --- .../broker/admin/impl/PersistentTopicsBase.java | 4 +- .../pulsar/client/api/NonPersistentTopicTest.java | 50 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b459011..5d83f8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1094,8 +1094,8 @@ public class PersistentTopicsBase extends AdminResource { throw ex; } - String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), - "persistent", dn.getEncodedLocalName()); + String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getNamespace(), dn.getDomain().toString(), + dn.getEncodedLocalName()); // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index cebce57..5f45bc4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -186,6 +186,56 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { } + @Test(dataProvider = "subscriptionType") + public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception { + log.info("-- Starting {} test --", methodName); + + final int numPartitions = 5; + final String topic = "non-persistent://my-property/use/my-ns/partitioned-topic"; + admin.nonPersistentTopics().createPartitionedTopic(topic, numPartitions); + + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + BROKER_PORT) + .statsInterval(0, TimeUnit.SECONDS).build(); + Consumer consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1").subscriptionType(type) + .subscribe(); + + Producer producer = client.newProducer().topic(topic).create(); + + // Ensure all partitions exist + for (int i = 0; i < numPartitions; i++) { + DestinationName partition = DestinationName.get(topic).getPartition(i); + assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString())); + } + + int totalProduceMsg = 500; + for (int i = 0; i < totalProduceMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + Thread.sleep(10); + } + + Message msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < totalProduceMsg; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + consumer.acknowledge(msg); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } else { + break; + } + } + assertEquals(messageSet.size(), totalProduceMsg); + + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + client.close(); + } + /** * It verifies that broker doesn't dispatch messages if consumer runs out of permits * filled out with messages -- To stop receiving notification emails like this one, please contact mme...@apache.org.