This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 68cd115 Issue #1237: support builder for topicsConsumer (#1269) 68cd115 is described below commit 68cd1154e9581bb8503eeee2da5a8e1d44336b25 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Thu Feb 22 11:38:59 2018 -0800 Issue #1237: support builder for topicsConsumer (#1269) * support builder for topicsConsumer * change following @Matteo's comments to remove subscribe methods in client * change to topic(String ... names) for consumer builder * change following @sijie's comments --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 197 +++++++++++++++------ .../apache/pulsar/client/api/ConsumerBuilder.java | 16 +- .../org/apache/pulsar/client/api/PulsarClient.java | 57 ------ .../pulsar/client/impl/ConsumerBuilderImpl.java | 41 ++++- .../pulsar/client/impl/PulsarClientImpl.java | 32 ---- 5 files changed, 186 insertions(+), 157 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index a2cb6d5..2aa386e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -33,12 +33,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -82,12 +79,13 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); try { - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); fail("subscribe for topics from different namespace should fail."); } catch (IllegalArgumentException e) { // expected for have different namespace @@ -103,18 +101,21 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; - List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + List<String> topicNames = Lists.newArrayList(topicName1, topicName2); admin.properties().createProperty("prop", new PropertyAdmin()); admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .topic(topicName3) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics(); @@ -148,20 +149,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -205,20 +210,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // Asynchronously produce messages @@ -280,20 +289,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -416,20 +429,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName2, 2); admin.persistentTopics().createPartitionedTopic(topicName3, 3); - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); - // 1. producer connect - Producer producer1 = pulsarClient.createProducer(topicName1); - Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration); - Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration); + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer - ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setReceiverQueueSize(4); - conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); - conf.setSubscriptionType(SubscriptionType.Shared); - Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get(); + Consumer consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); assertTrue(consumer instanceof TopicsConsumerImpl); // 3. producer publish messages @@ -519,4 +536,68 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { producer3.close(); } + + @Test(timeOut = testTimeout) + public void testTopicsNameSubscribeWithBuilderFail() throws Exception { + String key = "TopicsNameSubscribeWithBuilder"; + final String subscriptionName = "my-ex-subscription-" + key; + + final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; + final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; + final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key; + List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3); + + admin.properties().createProperty("prop", new PropertyAdmin()); + admin.persistentTopics().createPartitionedTopic(topicName2, 2); + admin.persistentTopics().createPartitionedTopic(topicName3, 3); + + // test failing builder with empty topics + try { + Consumer consumer1 = pulsarClient.newConsumer() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe1 with no topicName should fail."); + } catch (PulsarClientException e) { + // expected + } + + try { + Consumer consumer2 = pulsarClient.newConsumer() + .topic() + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe2 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + Consumer consumer3 = pulsarClient.newConsumer() + .topics(null) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe3 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + + try { + Consumer consumer4 = pulsarClient.newConsumer() + .topics(Lists.newArrayList()) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .subscribe(); + fail("subscribe4 with no topicName should fail."); + } catch (IllegalArgumentException e) { + // expected + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index a2c3c81..be97ab6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -77,13 +78,22 @@ public interface ConsumerBuilder extends Serializable, Cloneable { CompletableFuture<Consumer> subscribeAsync(); /** - * Specify the topic this consumer will subscribe on. + * Specify the topics this consumer will subscribe on. * <p> * This argument is required when constructing the consumer. * - * @param topicName + * @param topicNames */ - ConsumerBuilder topic(String topicName); + ConsumerBuilder topic(String... topicNames); + + /** + * Specify a list of topics that this consumer will subscribe on. + * <p> + * This argument is required when constructing the consumer. + * + * @param topicNames + */ + ConsumerBuilder topics(List<String> topicNames); /** * Specify the subscription name for this consumer. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java index 8de76f4..6ba2518 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.api; import java.io.Closeable; -import java.util.Collection; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.impl.ClientBuilderImpl; @@ -321,60 +320,4 @@ public interface PulsarClient extends Closeable { * if the forceful shutdown fails */ void shutdown() throws PulsarClientException; - - - /** - * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @return The {@code Consumer} object - * @throws PulsarClientException - */ - Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topics and subscription combination with - * default {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @return Future of the {@code Consumer} object - */ - CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription); - - /** - * Subscribe to the given topics and subscription combination using given {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return Future of the {@code Consumer} object - */ - Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf) - throws PulsarClientException; - - /** - * Asynchronously subscribe to the given topics and subscription combination using given - * {@code ConsumerConfiguration} - * - * @param topics - * The collection of topic names, they should be under same namespace - * @param subscription - * The name of the subscription - * @param conf - * The {@code ConsumerConfiguration} object - * @return Future of the {@code Consumer} object - */ - CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, - String subscription, - ConsumerConfiguration conf); - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ab91326..66cd291 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -18,7 +18,13 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -40,9 +46,9 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { private static final long serialVersionUID = 1L; private final PulsarClientImpl client; - private String topicName; private String subscriptionName; private final ConsumerConfiguration conf; + private Set<String> topicNames; ConsumerBuilderImpl(PulsarClientImpl client) { this.client = client; @@ -77,22 +83,43 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { @Override public CompletableFuture<Consumer> subscribeAsync() { - if (topicName == null) { + if (topicNames == null || topicNames.isEmpty()) { return FutureUtil - .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); + .failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder")); } if (subscriptionName == null) { return FutureUtil.failedFuture( - new IllegalArgumentException("Subscription name must be set on the producer builder")); + new IllegalArgumentException("Subscription name must be set on the consumer builder")); } - return client.subscribeAsync(topicName, subscriptionName, conf); + if (topicNames.size() == 1) { + return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf); + } else { + return client.subscribeAsync(topicNames, subscriptionName, conf); + } } @Override - public ConsumerBuilder topic(String topicName) { - this.topicName = topicName; + public ConsumerBuilder topic(String... topicNames) { + checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty."); + if (this.topicNames == null) { + this.topicNames = Sets.newHashSet(topicNames); + } else { + this.topicNames.addAll(Lists.newArrayList(topicNames)); + } + return this; + } + + @Override + public ConsumerBuilder topics(List<String> topicNames) { + checkArgument(topicNames != null && !topicNames.isEmpty(), + "Passed in topicNames list should not be empty."); + if (this.topicNames == null) { + this.topicNames = Sets.newHashSet(); + } + this.topicNames.addAll(topicNames); + return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 5d7aab8..c065532 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -308,38 +308,6 @@ public class PulsarClientImpl implements PulsarClient { return consumerSubscribedFuture; } - - @Override - public Consumer subscribe(Collection<String> topics, final String subscription) throws PulsarClientException { - return subscribe(topics, subscription, new ConsumerConfiguration()); - } - - @Override - public Consumer subscribe(Collection<String> topics, - String subscription, - ConsumerConfiguration conf) - throws PulsarClientException { - try { - return subscribeAsync(topics, subscription, conf).get(); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof PulsarClientException) { - throw (PulsarClientException) t; - } else { - throw new PulsarClientException(t); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription) { - return subscribeAsync(topics, subscription, new ConsumerConfiguration()); - } - - @Override public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription, ConsumerConfiguration conf) { -- To stop receiving notification emails like this one, please contact mme...@apache.org.