merlimat closed pull request #1269: Issue #1237: support builder for topicsConsumer URL: https://github.com/apache/incubator-pulsar/pull/1269
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index a2cb6d579..2aa386e71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,12 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -82,12 +79,13 @@ public void testDifferentTopicsNameSubscribe() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); try { - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); fail("subscribe for topics from different namespace should fail."); } catch (IllegalArgumentException e) { // expected for have different namespace @@ -103,18 +101,21 @@ public void testGetConsumersAndGetTopics() throws Exception { final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; - List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + List<String> topicNames = Lists.newArrayList(topicName1, topicName2); admin.properties().createProperty("prop", new PropertyAdmin()); admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .topic(topicName3) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics(); @@ -148,20 +149,24 @@ public void testSyncProducerAndConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -205,20 +210,24 @@ public void testAsyncConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // Asynchronously produce messages @@ -280,20 +289,24 @@ public void testConsumerUnackedRedelivery() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -416,20 +429,24 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -519,4 +536,68 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { producer3.close(); } + + @Test(timeOut = testTimeout) + public void testTopicsNameSubscribeWithBuilderFail() throws Exception { + String key = "TopicsNameSubscribeWithBuilder"; + final String subscriptionName = "my-ex-subscription-" + key; + + final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; + final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; + final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; + List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + + admin.properties().createProperty("prop", new PropertyAdmin()); + admin.persistentTopics().createPartitionedTopic(topicName2, 2); + admin.persistentTopics().createPartitionedTopic(topicName3, 3); + + // test failing builder with empty topics + try { + Consumer consumer1 = pulsarClient.newConsumer() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe1 with no topicName should fail."); + } catch (PulsarClientException e) { + // expected + } + + try { + Consumer consumer2 = pulsarClient.newConsumer() + .topic() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe2 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + Consumer consumer3 = pulsarClient.newConsumer() + .topics(null) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe3 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + Consumer consumer4 = pulsarClient.newConsumer() + .topics(Lists.newArrayList()) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe4 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index a2c3c8133..be97ab696 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -77,13 +78,22 @@ CompletableFuture<Consumer> subscribeAsync(); /** - * Specify the topic this consumer will subscribe on. + * Specify the topics this consumer will subscribe on. * <p> * This argument is required when constructing the consumer. * - * @param topicName + * @param topicNames */ - ConsumerBuilder topic(String topicName); + ConsumerBuilder topic(String... topicNames); + + /** + * Specify a list of topics that this consumer will subscribe on. + * <p> + * This argument is required when constructing the consumer. + * + * @param topicNames + */ + ConsumerBuilder topics(List<String> topicNames); /** * Specify the subscription name for this consumer. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 8de76f46d..6ba2518d0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api; import java.io.Closeable; -import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.impl.ClientBuilderImpl; @@ -321,60 +320,4 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t * if the forceful shutdown fails */ void shutdown() throws PulsarClientException; - - - /** - * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @return The {@code Consumer} object - * @throws PulsarClientException - */ - Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topics and subscription combination with - * default {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @return Future of the {@code Consumer} object - */ - CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription); - - /** - * Subscribe to the given topics and subscription combination using given {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return Future of the {@code Consumer} object - */ - Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf) - throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topics and subscription combination using given - * {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return Future of the {@code Consumer} object - */ - CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, - String subscription, - ConsumerConfiguration conf); - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ab9132615..66cd2912c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -18,7 +18,13 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -40,9 +46,9 @@ private static final long serialVersionUID = 1L; private final PulsarClientImpl client; - private String topicName; private String subscriptionName; private final ConsumerConfiguration conf; + private Set<String> topicNames; ConsumerBuilderImpl(PulsarClientImpl client) { this.client = client; @@ -77,22 +83,43 @@ public Consumer subscribe() throws PulsarClientException { @Override public CompletableFuture<Consumer> subscribeAsync() { - if (topicName == null) { + if (topicNames == null || topicNames.isEmpty()) { return FutureUtil - .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + .failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder")); } if (subscriptionName == null) { return FutureUtil.failedFuture( - new IllegalArgumentException("Subscription name must be set on the producer builder")); + new IllegalArgumentException("Subscription name must be set on the consumer builder")); } - return client.subscribeAsync(topicName, subscriptionName, conf); + if (topicNames.size() == 1) { + return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf); + } else { + return client.subscribeAsync(topicNames, subscriptionName, conf); + } } @Override - public ConsumerBuilder topic(String topicName) { - this.topicName = topicName; + public ConsumerBuilder topic(String... topicNames) { + checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty."); + if (this.topicNames == null) { + this.topicNames = Sets.newHashSet(topicNames); + } else { + this.topicNames.addAll(Lists.newArrayList(topicNames)); + } + return this; + } + + @Override + public ConsumerBuilder topics(List<String> topicNames) { + checkArgument(topicNames != null && !topicNames.isEmpty(), + "Passed in topicNames list should not be empty."); + if (this.topicNames == null) { + this.topicNames = Sets.newHashSet(); + } + this.topicNames.addAll(topicNames); + return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 5d7aab8d0..c065532a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -308,38 +308,6 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati return consumerSubscribedFuture; } - - @Override - public Consumer subscribe(Collection<String> topics, final String subscription) throws PulsarClientException { - return subscribe(topics, subscription, new ConsumerConfiguration()); - } - - @Override - public Consumer subscribe(Collection<String> topics, - String subscription, - ConsumerConfiguration conf) - throws PulsarClientException { - try { - return subscribeAsync(topics, subscription, conf).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription) { - return subscribeAsync(topics, subscription, new ConsumerConfiguration()); - } - - @Override public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription, ConsumerConfiguration conf) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services