Repository: kafka
Updated Branches:
  refs/heads/trunk 3dcbbf703 -> 42b356500


KAFKA-6005; Reject JoinGroup request from first member with empty protocol 
type/protocol list

Author: Manikumar Reddy <manikumar.re...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3957 from omkreddy/JOIN-GROUP-EMPTY-PROTOCOL


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42b35650
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42b35650
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42b35650

Branch: refs/heads/trunk
Commit: 42b356500b7188eb2507f9b48399d5491a7eff16
Parents: 3dcbbf7
Author: Manikumar Reddy <manikumar.re...@gmail.com>
Authored: Tue Oct 3 08:37:30 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Oct 3 08:37:30 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 37 +++++++++++++++-----
 .../apache/kafka/common/protocol/Errors.java    |  3 +-
 .../clients/consumer/KafkaConsumerTest.java     | 33 +++++++++--------
 .../coordinator/group/GroupCoordinator.scala    |  3 ++
 .../group/GroupCoordinatorTest.scala            | 16 +++++++++
 5 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d6764ca..6fb6919 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -566,6 +566,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
     private volatile boolean closed = false;
+    private List<PartitionAssignor> assignors;
 
     // currentThread holds the threadId of the current thread accessing 
KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -730,7 +731,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
-            List<PartitionAssignor> assignors = config.getConfiguredInstances(
+            this.assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
             this.coordinator = new ConsumerCoordinator(logContext,
@@ -797,7 +798,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   SubscriptionState subscriptions,
                   Metadata metadata,
                   long retryBackoffMs,
-                  long requestTimeoutMs) {
+                  long requestTimeoutMs,
+                  List<PartitionAssignor> assignors) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
         this.coordinator = coordinator;
@@ -812,6 +814,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.assignors = assignors;
     }
 
     /**
@@ -874,7 +877,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *                 subscribed topics
      * @throws IllegalArgumentException If topics is null or contains null or 
empty elements, or if listener is null
      * @throws IllegalStateException If {@code subscribe()} is called 
previously with pattern, or assign is called
-     *                               previously (without a subsequent call to 
{@link #unsubscribe()})
+     *                               previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
+     *                               configured at-least one partition 
assignment strategy
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener 
listener) {
@@ -890,6 +894,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     if (topic == null || topic.trim().isEmpty())
                         throw new IllegalArgumentException("Topic collection 
to subscribe to cannot contain null or empty topic");
                 }
+
+                throwIfNoAssignorsConfigured();
+
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", 
"));
                 this.subscriptions.subscribe(new HashSet<>(topics), listener);
                 metadata.setTopics(subscriptions.groupSubscription());
@@ -917,7 +924,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param topics The list of topics to subscribe to
      * @throws IllegalArgumentException If topics is null or contains null or 
empty elements
      * @throws IllegalStateException If {@code subscribe()} is called 
previously with pattern, or assign is called
-     *                               previously (without a subsequent call to 
{@link #unsubscribe()})
+     *                               previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
+     *                               configured at-least one partition 
assignment strategy
      */
     @Override
     public void subscribe(Collection<String> topics) {
@@ -943,7 +951,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *                 subscribed topics
      * @throws IllegalArgumentException If pattern or listener is null
      * @throws IllegalStateException If {@code subscribe()} is called 
previously with topics, or assign is called
-     *                               previously (without a subsequent call to 
{@link #unsubscribe()})
+     *                               previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
+     *                               configured at-least one partition 
assignment strategy
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
@@ -951,6 +960,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             if (pattern == null)
                 throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be null");
+
+            throwIfNoAssignorsConfigured();
+
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
@@ -974,7 +986,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @param pattern Pattern to subscribe to
      * @throws IllegalArgumentException If pattern is null
      * @throws IllegalStateException If {@code subscribe()} is called 
previously with topics, or assign is called
-     *                               previously (without a subsequent call to 
{@link #unsubscribe()})
+     *                               previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
+     *                               configured at-least one partition 
assignment strategy
      */
     @Override
     public void subscribe(Pattern pattern) {
@@ -1568,7 +1581,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
      *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
      *         such message.
-     * @throws AuthenticationException if authentication fails. See the 
exception for more details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative.
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
      *         expiration of the configured request timeout
@@ -1672,7 +1685,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param timeout The maximum time to wait for consumer to close 
gracefully. The value must be
      *                non-negative. Specifying a timeout of zero means do not 
wait for pending requests to complete.
      * @param timeUnit The time unit for the {@code timeout}
-     * @throws AuthenticationException if authentication fails. See the 
exception for more details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws InterruptException If the thread is interrupted before or while 
this function is called
      * @throws IllegalArgumentException If the {@code timeout} is negative.
      */
@@ -1742,7 +1755,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
-     * @throws AuthenticationException if authentication fails. See the 
exception for more details
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
      *             defined
      */
@@ -1797,4 +1810,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         if (refcount.decrementAndGet() == 0)
             currentThread.set(NO_CURRENT_THREAD);
     }
+
+    private void throwIfNoAssignorsConfigured() {
+        if (assignors.isEmpty())
+            throw new IllegalStateException("Must configure at least one 
partition assigner class name to " +
+                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " 
configuration property");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bea6050..d937054 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -266,7 +266,8 @@ public enum Errors {
             }
         }),
     INCONSISTENT_GROUP_PROTOCOL(23,
-            "The group member's supported protocols are incompatible with 
those of existing members.",
+            "The group member's supported protocols are incompatible with 
those of existing members" +
+                " or first group member tried to join with empty protocol type 
or empty protocol list.",
         new ApiExceptionBuilder() {
             @Override
             public ApiException build(String message) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c5e2213..632bec0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -235,12 +235,22 @@ public class KafkaConsumerTest {
         }
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testSeekNegative() {
+    @Test(expected = IllegalStateException.class)
+    public void testSubscriptionWithEmptyPartitionAssignment() {
         Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
"");
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props, 
new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        try {
+            consumer.subscribe(singletonList(topic));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekNegative() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 
0)));
@@ -252,10 +262,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testAssignOnNullTopicPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(null);
@@ -277,10 +283,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnNullTopicInPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testAssignOnNullTopicInPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition(null, 0)));
@@ -291,10 +293,6 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testAssignOnEmptyTopicInPartition() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"testAssignOnEmptyTopicInPartition");
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
-        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         try {
             consumer.assign(Arrays.asList(new TopicPartition("  ", 0)));
@@ -1678,7 +1676,8 @@ public class KafkaConsumerTest {
                 subscriptions,
                 metadata,
                 retryBackoffMs,
-                requestTimeoutMs);
+                requestTimeoutMs,
+                assignors);
     }
 
     private static class FetchInfo {

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 42bc3c3..bb59bcd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -146,6 +146,9 @@ class GroupCoordinator(val brokerId: Int,
       if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || 
!group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
+      } else if (group.is(Empty) && (protocols.isEmpty || 
protocolType.isEmpty)) {
+        //reject if first member with empty group protocol or protocolType is 
empty
+        responseCallback(joinError(memberId, 
Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && 
!group.has(memberId)) {
         // if the member trying to register with a un-recognized id, send the 
response to let
         // it reset its member id and retry

http://git-wip-us.apache.org/repos/asf/kafka/blob/42b35650/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 95abb33..85d72c3 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -186,6 +186,22 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testJoinGroupWithEmptyProtocolType() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, "", protocols)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+  }
+
+  @Test
+  def testJoinGroupWithEmptyGroupProtocol() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, List())
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, joinGroupResult.error)
+  }
+
+  @Test
   def testJoinGroupInconsistentGroupProtocol() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 

Reply via email to