Repository: kafka Updated Branches: refs/heads/trunk 3dcbbf703 -> 42b356500
KAFKA-6005; Reject JoinGroup request from first member with empty protocol type/protocol list Author: Manikumar Reddy <manikumar.re...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3957 from omkreddy/JOIN-GROUP-EMPTY-PROTOCOL Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42b35650 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42b35650 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42b35650 Branch: refs/heads/trunk Commit: 42b356500b7188eb2507f9b48399d5491a7eff16 Parents: 3dcbbf7 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Tue Oct 3 08:37:30 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 3 08:37:30 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 37 +++++++++++++++----- .../apache/kafka/common/protocol/Errors.java | 3 +- .../clients/consumer/KafkaConsumerTest.java | 33 +++++++++-------- .../coordinator/group/GroupCoordinator.scala | 3 ++ .../group/GroupCoordinatorTest.scala | 16 +++++++++ 5 files changed, 65 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d6764ca..6fb6919 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -566,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final long retryBackoffMs; private final long requestTimeoutMs; private volatile boolean closed = false; + private List<PartitionAssignor> assignors; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -730,7 +731,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); this.subscriptions = new SubscriptionState(offsetResetStrategy); - List<PartitionAssignor> assignors = config.getConfiguredInstances( + this.assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); this.coordinator = new ConsumerCoordinator(logContext, @@ -797,7 +798,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { SubscriptionState subscriptions, Metadata metadata, long retryBackoffMs, - long requestTimeoutMs) { + long requestTimeoutMs, + List<PartitionAssignor> assignors) { this.log = logContext.logger(getClass()); this.clientId = clientId; this.coordinator = coordinator; @@ -812,6 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.metadata = metadata; this.retryBackoffMs = retryBackoffMs; this.requestTimeoutMs = requestTimeoutMs; + this.assignors = assignors; } /** @@ -874,7 +877,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * subscribed topics * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}) + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy */ @Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { @@ -890,6 +894,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (topic == null || topic.trim().isEmpty()) throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); } + + throwIfNoAssignorsConfigured(); + log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); this.subscriptions.subscribe(new HashSet<>(topics), listener); metadata.setTopics(subscriptions.groupSubscription()); @@ -917,7 +924,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}) + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy */ @Override public void subscribe(Collection<String> topics) { @@ -943,7 +951,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * subscribed topics * @throws IllegalArgumentException If pattern or listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}) + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { @@ -951,6 +960,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { if (pattern == null) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); + + throwIfNoAssignorsConfigured(); + log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); @@ -974,7 +986,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param pattern Pattern to subscribe to * @throws IllegalArgumentException If pattern is null * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called - * previously (without a subsequent call to {@link #unsubscribe()}) + * previously (without a subsequent call to {@link #unsubscribe()}), or if not + * configured at-least one partition assignment strategy */ @Override public void subscribe(Pattern pattern) { @@ -1568,7 +1581,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. - * @throws AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws IllegalArgumentException if the target timestamp is negative. * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured request timeout @@ -1672,7 +1685,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param timeout The maximum time to wait for consumer to close gracefully. The value must be * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. * @param timeUnit The time unit for the {@code timeout} - * @throws AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws InterruptException If the thread is interrupted before or while this function is called * @throws IllegalArgumentException If the {@code timeout} is negative. */ @@ -1742,7 +1755,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * or reset it using the offset reset policy the user has configured. * * @param partitions The partitions that needs updating fetch positions - * @throws AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined */ @@ -1797,4 +1810,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (refcount.decrementAndGet() == 0) currentThread.set(NO_CURRENT_THREAD); } + + private void throwIfNoAssignorsConfigured() { + if (assignors.isEmpty()) + throw new IllegalStateException("Must configure at least one partition assigner class name to " + + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index bea6050..d937054 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -266,7 +266,8 @@ public enum Errors { } }), INCONSISTENT_GROUP_PROTOCOL(23, - "The group member's supported protocols are incompatible with those of existing members.", + "The group member's supported protocols are incompatible with those of existing members" + + " or first group member tried to join with empty protocol type or empty protocol list.", new ApiExceptionBuilder() { @Override public ApiException build(String message) { http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c5e2213..632bec0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -235,12 +235,22 @@ public class KafkaConsumerTest { } } - @Test(expected = IllegalArgumentException.class) - public void testSeekNegative() { + @Test(expected = IllegalStateException.class) + public void testSubscriptionWithEmptyPartitionAssignment() { Properties props = new Properties(); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, ""); + + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + try { + consumer.subscribe(singletonList(topic)); + } finally { + consumer.close(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testSeekNegative() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(); try { consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0))); @@ -252,10 +262,6 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicPartition() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicPartition"); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); KafkaConsumer<byte[], byte[]> consumer = newConsumer(); try { consumer.assign(null); @@ -277,10 +283,6 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicInPartition() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnNullTopicInPartition"); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); KafkaConsumer<byte[], byte[]> consumer = newConsumer(); try { consumer.assign(Arrays.asList(new TopicPartition(null, 0))); @@ -291,10 +293,6 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testAssignOnEmptyTopicInPartition() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testAssignOnEmptyTopicInPartition"); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); KafkaConsumer<byte[], byte[]> consumer = newConsumer(); try { consumer.assign(Arrays.asList(new TopicPartition(" ", 0))); @@ -1678,7 +1676,8 @@ public class KafkaConsumerTest { subscriptions, metadata, retryBackoffMs, - requestTimeoutMs); + requestTimeoutMs, + assignors); } private static class FetchInfo { http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 42bc3c3..bb59bcd 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -146,6 +146,9 @@ class GroupCoordinator(val brokerId: Int, if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) { // if the new member does not support the group protocol, reject it responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL)) + } else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty)) { + //reject if first member with empty group protocol or protocolType is empty + responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL)) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { // if the member trying to register with a un-recognized id, send the response to let // it reset its member id and retry http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 95abb33..85d72c3 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -186,6 +186,22 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testJoinGroupWithEmptyProtocolType() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, "", protocols) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) + } + + @Test + def testJoinGroupWithEmptyGroupProtocol() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, List()) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error) + } + + @Test def testJoinGroupInconsistentGroupProtocol() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID