showuon commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1017395682


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java:
##########
@@ -146,43 +165,22 @@ public void deserializeNewSubscriptionWithOldVersion() {
         assertNull(parsedSubscription.userData());
         assertTrue(parsedSubscription.ownedPartitions().isEmpty());
         assertFalse(parsedSubscription.groupInstanceId().isPresent());
+        assertEquals(DEFAULT_GENERATION, parsedSubscription.generationId());
     }
 
-    @Test
-    public void deserializeFutureSubscriptionVersion() {
-        // verify that a new version which adds a field is still parseable
-        short version = 100;
-
-        Schema subscriptionSchemaV100 = new Schema(
-            new Field("topics", new ArrayOf(Type.STRING)),
-            new Field("user_data", Type.NULLABLE_BYTES),
-            new Field("owned_partitions", new ArrayOf(
-                ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)),
-            new Field("foo", Type.STRING));
-
-        Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
-        subscriptionV100.set("topics", new Object[]{"topic"});
-        subscriptionV100.set("user_data", ByteBuffer.wrap(new byte[0]));
-        subscriptionV100.set("owned_partitions", new Object[]{new Struct(
-            ConsumerProtocolSubscription.TopicPartition.SCHEMA_1)
-            .set("topic", tp2.topic())
-            .set("partitions", new Object[]{tp2.partition()})});
-        subscriptionV100.set("foo", "bar");
-
-        Struct headerV100 = new Struct(new Schema(new Field("version", 
Type.INT16)));
-        headerV100.set("version", version);
-
-        ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + 
headerV100.sizeOf());
-        headerV100.writeTo(buffer);
-        subscriptionV100.writeTo(buffer);
-
-        buffer.flip();
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void deserializeFutureSubscriptionVersion(boolean hasGenerationId) {
+        ByteBuffer buffer = 
generateFutureSubscriptionVersionData(hasGenerationId);
 
         Subscription subscription = 
ConsumerProtocol.deserializeSubscription(buffer);
         subscription.setGroupInstanceId(groupInstanceId);
         assertEquals(Collections.singleton("topic"), 
toSet(subscription.topics()));
         assertEquals(Collections.singleton(tp2), 
toSet(subscription.ownedPartitions()));
         assertEquals(groupInstanceId, subscription.groupInstanceId());
+        if (hasGenerationId) {
+            assertEquals(generationId, subscription.generationId());
+        }

Review Comment:
   test updated, no `hasGenerationId` anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to