This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 362fdd8  KAFKA-10647; Only serialize owned partitions when consumer 
protocol version >= 1 (#9506)
362fdd8 is described below

commit 362fdd824563c4ac7757c393dda8b59f2c64db90
Author: David Jacot <[email protected]>
AuthorDate: Tue Oct 27 11:11:24 2020 +0100

    KAFKA-10647; Only serialize owned partitions when consumer protocol version 
>= 1 (#9506)
    
    A regression got introduced by 
https://github.com/apache/kafka/commit/466f8fd21c6651ea5daa50154239e85fa629dbb4.
 The owned partition field must be ignored for version < 1 otherwise the 
serialization fails with an unsupported version exception.
    
    Reviewers: Jason Gustafson <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../message/ConsumerProtocolSubscription.json      |  2 +-
 .../consumer/internals/ConsumerProtocolTest.java   | 37 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json 
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index e921e26..fa4c371 100644
--- 
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++ 
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -26,7 +26,7 @@
     { "name": "Topics", "type": "[]string", "versions": "0+" },
     { "name": "UserData", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+",
       "default": "null", "zeroCopy": true },
-    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+",
+    { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", 
"ignorable": true,
       "fields": [
         { "name": "Topic", "type": "string", "versions": "1+" },
         { "name": "Partitions", "type": "[]int32", "versions": "1+"}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 96bc839..5fc5f83 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -47,6 +47,30 @@ public class ConsumerProtocolTest {
     private final Optional<String> groupInstanceId = 
Optional.of("instance.id");
 
     @Test
+    public void serializeDeserializeSubscriptionAllVersions() {
+        List<TopicPartition> ownedPartitions = Arrays.asList(
+            new TopicPartition("foo", 0),
+            new TopicPartition("bar", 0));
+        Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"),
+            ByteBuffer.wrap("hello".getBytes()), ownedPartitions);
+
+        for (short version = 
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+            ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription, version);
+            Subscription parsedSubscription = 
ConsumerProtocol.deserializeSubscription(buffer);
+
+            assertEquals(subscription.topics(), parsedSubscription.topics());
+            assertEquals(subscription.userData(), 
parsedSubscription.userData());
+            assertFalse(parsedSubscription.groupInstanceId().isPresent());
+
+            if (version >= 1) {
+                assertEquals(toSet(subscription.ownedPartitions()), 
toSet(parsedSubscription.ownedPartitions()));
+            } else {
+                assertEquals(Collections.emptyList(), 
parsedSubscription.ownedPartitions());
+            }
+        }
+    }
+
+    @Test
     public void serializeDeserializeMetadata() {
         Subscription subscription = new Subscription(Arrays.asList("foo", 
"bar"), ByteBuffer.wrap(new byte[0]));
         ByteBuffer buffer = 
ConsumerProtocol.serializeSubscription(subscription);
@@ -138,6 +162,19 @@ public class ConsumerProtocolTest {
     }
 
     @Test
+    public void serializeDeserializeAssignmentAllVersions() {
+        List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
+        Assignment assignment = new Assignment(partitions, 
ByteBuffer.wrap("hello".getBytes()));
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            ByteBuffer buffer = 
ConsumerProtocol.serializeAssignment(assignment, version);
+            Assignment parsedAssignment = 
ConsumerProtocol.deserializeAssignment(buffer);
+            assertEquals(toSet(partitions), 
toSet(parsedAssignment.partitions()));
+            assertEquals(assignment.userData(), parsedAssignment.userData());
+        }
+    }
+
+    @Test
     public void serializeDeserializeAssignment() {
         List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
         ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new 
Assignment(partitions, ByteBuffer.wrap(new byte[0])));

Reply via email to