Repository: kafka Updated Branches: refs/heads/trunk 510257646 -> 23d01c805
KAFKA-5726; KafkaConsumer.subscribe() overload that takes just Pattern - changed the interface & implementations - updated tests to use the new method where applicable Author: Attila Kreiner <att...@kreiner.hu> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3669 from attilakreiner/KAFKA-5726 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23d01c80 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23d01c80 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23d01c80 Branch: refs/heads/trunk Commit: 23d01c805bef7504abfa83ecac7e384d121a583a Parents: 5102576 Author: Attila Kreiner <att...@kreiner.hu> Authored: Wed Sep 6 11:41:33 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Sep 6 11:41:33 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/Consumer.java | 5 +++++ .../kafka/clients/consumer/KafkaConsumer.java | 19 +++++++++++++++++++ .../kafka/clients/consumer/MockConsumer.java | 5 +++++ .../clients/consumer/KafkaConsumerTest.java | 6 +++--- .../main/scala/kafka/consumer/BaseConsumer.scala | 2 +- .../kafka/api/AuthorizerIntegrationTest.scala | 14 +++++++------- 6 files changed, 40 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index b1badef..0e27e1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -66,6 +66,11 @@ public interface Consumer<K, V> extends Closeable { public void subscribe(Pattern pattern, ConsumerRebalanceListener callback); /** + * @see KafkaConsumer#subscribe(Pattern) + */ + public void subscribe(Pattern pattern); + + /** * @see KafkaConsumer#unsubscribe() */ public void unsubscribe(); http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 073b2df..1cdb132 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -953,6 +953,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { } /** + * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. + * The pattern matching will be done periodically against topics existing at the time of check. + * + * <p> + * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which + * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer + * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets + * to be reset. You should also provide your own listener if you are doing your own offset + * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. + * + * @param pattern Pattern to subscribe to + * @throws IllegalArgumentException If pattern is null + */ + @Override + public void subscribe(Pattern pattern) { + subscribe(pattern, new NoOpConsumerRebalanceListener()); + } + + /** * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This * also clears any partitions directly assigned through {@link #assign(Collection)}. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 91cb6f1..9b0c058 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -111,6 +111,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } @Override + public synchronized void subscribe(Pattern pattern) { + subscribe(pattern, new NoOpConsumerRebalanceListener()); + } + + @Override public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) { ensureNotClosed(); this.subscriptions.subscribe(new HashSet<>(topics), listener); http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index eed012e..c5e2213 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; @@ -191,9 +190,10 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullTopicCollection() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(); + List<String> nullList = null; try { - consumer.subscribe(null); + consumer.subscribe(nullList); } finally { consumer.close(); } @@ -229,7 +229,7 @@ public class KafkaConsumerTest { Pattern pattern = null; try { - consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()); + consumer.subscribe(pattern); } finally { consumer.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/main/scala/kafka/consumer/BaseConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index cec74d0..2c53258 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -73,7 +73,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: case (Some(topic), None, None, None) => consumer.subscribe(Collections.singletonList(topic)) case (None, None, None, Some(whitelist)) => - consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener()) + consumer.subscribe(Pattern.compile(whitelist)) case _ => throw new IllegalArgumentException("An invalid combination of arguments is provided. " + "Exactly one of 'topic' or 'whitelist' must be provided. " + http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 67d15b3..ccb2719 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -623,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) val consumer = consumers.head - consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(topicPattern)) try { consumeRecords(consumer) Assert.fail("Expected TopicAuthorizationException") @@ -647,14 +647,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) val consumer = consumers.head - consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(topicPattern)) consumeRecords(consumer) // set the subscription pattern to an internal topic that the consumer has read permission to. Since // internal topics are not included, we should not be assigned any partitions from this topic addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic, GROUP_METADATA_TOPIC_NAME)) - consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) consumer.poll(0) assertTrue(consumer.subscription().isEmpty) assertTrue(consumer.assignment().isEmpty) @@ -675,14 +675,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) try { // ensure that internal topics are not included if no permission - consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(".*")) consumeRecords(consumer) assertEquals(Set(topic).asJava, consumer.subscription) // now authorize the user for the internal topic and verify that we can subscribe addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, GROUP_METADATA_TOPIC_NAME)) - consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME)) consumer.poll(0) assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala) } finally consumer.close() @@ -704,7 +704,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) try { - consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(".*")) // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException consumeRecords(consumer) consumeRecords(consumer) @@ -728,7 +728,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) try { - consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumer.subscribe(Pattern.compile(topicPattern)) consumeRecords(consumer) } finally consumer.close() }