This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 362fdd8 KAFKA-10647; Only serialize owned partitions when consumer
protocol version >= 1 (#9506)
362fdd8 is described below
commit 362fdd824563c4ac7757c393dda8b59f2c64db90
Author: David Jacot <[email protected]>
AuthorDate: Tue Oct 27 11:11:24 2020 +0100
KAFKA-10647; Only serialize owned partitions when consumer protocol version
>= 1 (#9506)
A regression got introduced by
https://github.com/apache/kafka/commit/466f8fd21c6651ea5daa50154239e85fa629dbb4.
The owned partition field must be ignored for version < 1 otherwise the
serialization fails with an unsupported version exception.
Reviewers: Jason Gustafson <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../message/ConsumerProtocolSubscription.json | 2 +-
.../consumer/internals/ConsumerProtocolTest.java | 37 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
index e921e26..fa4c371 100644
---
a/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
+++
b/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
@@ -26,7 +26,7 @@
{ "name": "Topics", "type": "[]string", "versions": "0+" },
{ "name": "UserData", "type": "bytes", "versions": "0+",
"nullableVersions": "0+",
"default": "null", "zeroCopy": true },
- { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+",
+ { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+",
"ignorable": true,
"fields": [
{ "name": "Topic", "type": "string", "versions": "1+" },
{ "name": "Partitions", "type": "[]int32", "versions": "1+"}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
index 96bc839..5fc5f83 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -47,6 +47,30 @@ public class ConsumerProtocolTest {
private final Optional<String> groupInstanceId =
Optional.of("instance.id");
@Test
+ public void serializeDeserializeSubscriptionAllVersions() {
+ List<TopicPartition> ownedPartitions = Arrays.asList(
+ new TopicPartition("foo", 0),
+ new TopicPartition("bar", 0));
+ Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"),
+ ByteBuffer.wrap("hello".getBytes()), ownedPartitions);
+
+ for (short version =
ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) {
+ ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription, version);
+ Subscription parsedSubscription =
ConsumerProtocol.deserializeSubscription(buffer);
+
+ assertEquals(subscription.topics(), parsedSubscription.topics());
+ assertEquals(subscription.userData(),
parsedSubscription.userData());
+ assertFalse(parsedSubscription.groupInstanceId().isPresent());
+
+ if (version >= 1) {
+ assertEquals(toSet(subscription.ownedPartitions()),
toSet(parsedSubscription.ownedPartitions()));
+ } else {
+ assertEquals(Collections.emptyList(),
parsedSubscription.ownedPartitions());
+ }
+ }
+ }
+
+ @Test
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo",
"bar"), ByteBuffer.wrap(new byte[0]));
ByteBuffer buffer =
ConsumerProtocol.serializeSubscription(subscription);
@@ -138,6 +162,19 @@ public class ConsumerProtocolTest {
}
@Test
+ public void serializeDeserializeAssignmentAllVersions() {
+ List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
+ Assignment assignment = new Assignment(partitions,
ByteBuffer.wrap("hello".getBytes()));
+
+ for (short version =
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <=
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+ ByteBuffer buffer =
ConsumerProtocol.serializeAssignment(assignment, version);
+ Assignment parsedAssignment =
ConsumerProtocol.deserializeAssignment(buffer);
+ assertEquals(toSet(partitions),
toSet(parsedAssignment.partitions()));
+ assertEquals(assignment.userData(), parsedAssignment.userData());
+ }
+ }
+
+ @Test
public void serializeDeserializeAssignment() {
List<TopicPartition> partitions = Arrays.asList(tp1, tp2);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new
Assignment(partitions, ByteBuffer.wrap(new byte[0])));