This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0bb8e66 KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617) 0bb8e66 is described below commit 0bb8e66184931e2f7830cb713d9260cc0f3383a9 Author: Siva Santhalingam <siva.santhalin...@gmail.com> AuthorDate: Mon Mar 12 23:03:32 2018 -0700 KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617) --- .../kafka/clients/consumer/KafkaConsumer.java | 25 +++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3cd034e..81137f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -966,13 +966,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + if (pattern == null) + throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); + acquireAndEnsureOpen(); try { - if (pattern == null) - throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); - throwIfNoAssignorsConfigured(); - log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); @@ -1337,11 +1336,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { */ @Override public void seek(TopicPartition partition, long offset) { + if (offset < 0) + throw new IllegalArgumentException("seek offset must not be a negative number"); + acquireAndEnsureOpen(); try { - if (offset < 0) - throw new IllegalArgumentException("seek offset must not be a negative number"); - log.debug("Seeking to offset {} for partition {}", offset, partition); this.subscriptions.seek(partition, offset); } finally { @@ -1357,11 +1356,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToBeginning(Collection<TopicPartition> partitions) { + if (partitions == null) + throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { - if (partitions == null) { - throw new IllegalArgumentException("Partitions collection cannot be null"); - } Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); @@ -1383,11 +1382,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToEnd(Collection<TopicPartition> partitions) { + if (partitions == null) + throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { - if (partitions == null) { - throw new IllegalArgumentException("Partitions collection cannot be null"); - } Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); -- To stop receiving notification emails like this one, please contact j...@apache.org.