chia7712 commented on code in PR #17755:
URL: https://github.com/apache/kafka/pull/17755#discussion_r1837488472


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##########
@@ -35,123 +35,77 @@
 import org.opentest4j.AssertionFailedError;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class Assertions {
-    public static <T> void assertUnorderedListEquals(
-        List<T> expected,
-        List<T> actual
-    ) {
-        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-    }
+    private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>> 
API_MESSAGE_COMPARATORS = new HashMap<>();

Review Comment:
   Perhaps we could use Map.of to simplify the code. For example:
   ```java
       private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>> 
API_MESSAGE_COMPARATORS = Map.of(
               // Register request/response comparators.
               ConsumerGroupHeartbeatResponseData.class, 
Assertions::assertConsumerGroupHeartbeatResponse,
               ShareGroupHeartbeatResponseData.class, 
Assertions::assertShareGroupHeartbeatResponse
       );
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##########
@@ -189,169 +143,188 @@ public static void assertRecordEquals(
         }
     }
 
+    private static void assertConsumerGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ConsumerGroupHeartbeatResponseData expected = 
(ConsumerGroupHeartbeatResponseData) exp.duplicate();
+        ConsumerGroupHeartbeatResponseData actual = 
(ConsumerGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ConsumerGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ConsumerGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertShareGroupHeartbeatResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ShareGroupHeartbeatResponseData expected = 
(ShareGroupHeartbeatResponseData) exp.duplicate();
+        ShareGroupHeartbeatResponseData actual = 
(ShareGroupHeartbeatResponseData) act.duplicate();
+
+        Consumer<ShareGroupHeartbeatResponseData> normalize = message -> {
+            if (message.assignment() != null) {
+                
message.assignment().topicPartitions().sort(Comparator.comparing(ShareGroupHeartbeatResponseData.TopicPartitions::topicId));
+                message.assignment().topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertApiMessageAndVersionEquals(
         ApiMessageAndVersion expected,
         ApiMessageAndVersion actual
     ) {
         if (expected == actual) return;
-
         assertEquals(expected.version(), actual.version());
+        BiConsumer<ApiMessage, ApiMessage> asserter = API_MESSAGE_COMPARATORS
+            .getOrDefault(expected.message().getClass(), DEFAULT_COMPARATOR);
+        asserter.accept(expected.message(), actual.message());
+    }
 
-        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
-            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
-            // always guaranteed. Therefore, we need a special comparator.
-            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
-            ConsumerGroupCurrentMemberAssignmentValue actualValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
-
-            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
-            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
-
-            // We transform those to Maps before comparing them.
-            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
-                fromTopicPartitions(actualValue.assignedPartitions()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
-        } else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
-            // The order of the racks stored in the PartitionMetadata of the 
ConsumerGroupPartitionMetadataValue
-            // is not always guaranteed. Therefore, we need a special 
comparator.
-            ConsumerGroupPartitionMetadataValue expectedValue =
-                (ConsumerGroupPartitionMetadataValue) 
expected.message().duplicate();
-            ConsumerGroupPartitionMetadataValue actualValue =
-                (ConsumerGroupPartitionMetadataValue) 
actual.message().duplicate();
-
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
expectedTopicMetadataList =
-                expectedValue.topics();
-            List<ConsumerGroupPartitionMetadataValue.TopicMetadata> 
actualTopicMetadataList =
-                actualValue.topics();
-
-            if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
-                fail("Topic metadata lists have different sizes");
-            }
+    private static void assertConsumerGroupCurrentMemberAssignmentValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
+        // always guaranteed. Therefore, we need a special comparator.
+        ConsumerGroupCurrentMemberAssignmentValue expected = 
(ConsumerGroupCurrentMemberAssignmentValue) exp.duplicate();
+        ConsumerGroupCurrentMemberAssignmentValue actual = 
(ConsumerGroupCurrentMemberAssignmentValue) act.duplicate();
+
+        Consumer<ConsumerGroupCurrentMemberAssignmentValue> normalize = 
message -> {
+            
message.assignedPartitions().sort(Comparator.comparing(ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId));
+            message.assignedPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+            
message.partitionsPendingRevocation().sort(Comparator.comparing(ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions::topicId));
+            message.partitionsPendingRevocation().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
 
-            
expectedTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-            
actualTopicMetadataList.sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
-
-            for (int i = 0; i < expectedTopicMetadataList.size(); i++) {
-                ConsumerGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
-                    expectedTopicMetadataList.get(i);
-                ConsumerGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
-                    actualTopicMetadataList.get(i);
-
-                assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
-                assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
-                assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
-
-                List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
expectedPartitionMetadataList =
-                    expectedTopicMetadata.partitionMetadata();
-                List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
actualPartitionMetadataList =
-                    actualTopicMetadata.partitionMetadata();
-
-                // If the list is empty, rack information wasn't available for 
any replica of
-                // the partition and hence, the entry wasn't added to the 
record.
-                if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
-                    fail("Partition metadata lists have different sizes");
-                } else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
-                    for (int j = 0; j < expectedPartitionMetadataList.size(); 
j++) {
-                        ConsumerGroupPartitionMetadataValue.PartitionMetadata 
expectedPartitionMetadata =
-                            expectedPartitionMetadataList.get(j);
-                        ConsumerGroupPartitionMetadataValue.PartitionMetadata 
actualPartitionMetadata =
-                            actualPartitionMetadataList.get(j);
-
-                        assertEquals(expectedPartitionMetadata.partition(), 
actualPartitionMetadata.partition());
-                        
assertUnorderedListEquals(expectedPartitionMetadata.racks(), 
actualPartitionMetadata.racks());
-                    }
-                }
-            }
-        } else if (actual.message() instanceof GroupMetadataValue) {
-            GroupMetadataValue expectedValue = (GroupMetadataValue) 
expected.message().duplicate();
-            GroupMetadataValue actualValue = (GroupMetadataValue) 
actual.message().duplicate();
-
-            Comparator<GroupMetadataValue.MemberMetadata> comparator =
-                
Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId);
-            expectedValue.members().sort(comparator);
-            actualValue.members().sort(comparator);
+    private static void assertConsumerGroupPartitionMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        // The order of the racks stored in the PartitionMetadata of the 
ConsumerGroupPartitionMetadataValue
+        // is not always guaranteed. Therefore, we need a special comparator.
+        ConsumerGroupPartitionMetadataValue expected = 
(ConsumerGroupPartitionMetadataValue) exp.duplicate();
+        ConsumerGroupPartitionMetadataValue actual = 
(ConsumerGroupPartitionMetadataValue) act.duplicate();
+
+        Consumer<ConsumerGroupPartitionMetadataValue> normalize = message -> {
+            
message.topics().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.TopicMetadata::topicId));
+            message.topics().forEach(topic -> {
+                
topic.partitionMetadata().sort(Comparator.comparing(ConsumerGroupPartitionMetadataValue.PartitionMetadata::partition));
+                topic.partitionMetadata().forEach(partition -> 
partition.racks().sort(String::compareTo));
+            });
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
+    private static void assertGroupMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        GroupMetadataValue expected = (GroupMetadataValue) exp.duplicate();
+        GroupMetadataValue actual = (GroupMetadataValue) act.duplicate();
+
+        Consumer<GroupMetadataValue> normalize = message -> {
+            
message.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId));
             try {
-                Arrays.asList(expectedValue, actualValue).forEach(value ->
-                    value.members().forEach(memberMetadata -> {
-                        // Sort topics and ownedPartitions in Subscription.
-                        ConsumerPartitionAssignor.Subscription subscription =
-                            
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
-                        subscription.topics().sort(String::compareTo);
-                        subscription.ownedPartitions().sort(
-                            
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
-                        );
-                        
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
-                            subscription,
-                            
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
-                        )));
-
-                        // Sort partitions in Assignment.
-                        ConsumerPartitionAssignor.Assignment assignment =
-                            
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
-                        assignment.partitions().sort(
-                            
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
-                        );
-                        
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
-                            assignment,
-                            
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
-                        )));
-                    })
-                );
+                message.members().forEach(memberMetadata -> {
+                    // Sort topics and ownedPartitions in Subscription.
+                    ConsumerPartitionAssignor.Subscription subscription =
+                        
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription()));
+                    subscription.topics().sort(String::compareTo);
+                    subscription.ownedPartitions().sort(
+                        
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+                    );
+                    
memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        subscription,
+                        
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription()))
+                    )));
+
+                    // Sort partitions in Assignment.
+                    ConsumerPartitionAssignor.Assignment assignment =
+                        
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment()));
+                    assignment.partitions().sort(
+                        
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)
+                    );
+                    
memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(
+                        assignment,
+                        
ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment()))
+                    )));
+                });
             } catch (SchemaException ex) {
                 fail("Failed deserialization: " + ex.getMessage());
             }
-            assertEquals(expectedValue, actualValue);
-        } else if (actual.message() instanceof 
ConsumerGroupTargetAssignmentMemberValue) {
-            ConsumerGroupTargetAssignmentMemberValue expectedValue =
-                (ConsumerGroupTargetAssignmentMemberValue) 
expected.message().duplicate();
-            ConsumerGroupTargetAssignmentMemberValue actualValue =
-                (ConsumerGroupTargetAssignmentMemberValue) 
actual.message().duplicate();
-
-            
Comparator<ConsumerGroupTargetAssignmentMemberValue.TopicPartition> comparator =
-                
Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId);
-            expectedValue.topicPartitions().sort(comparator);
-            actualValue.topicPartitions().sort(comparator);
-
-            assertEquals(expectedValue, actualValue);
-        } else {
-            assertEquals(expected.message(), actual.message());
-        }
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
     }
 
-    private static Map<Uuid, Set<Integer>> fromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
assignment
+    private static void assertConsumerGroupTargetAssignmentMemberValue(
+        ApiMessage exp,
+        ApiMessage act
     ) {
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions ->
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()))
-        );
-        return assignmentMap;
+        ConsumerGroupTargetAssignmentMemberValue expected = 
(ConsumerGroupTargetAssignmentMemberValue) exp.duplicate();
+        ConsumerGroupTargetAssignmentMemberValue actual = 
(ConsumerGroupTargetAssignmentMemberValue) act.duplicate();
+
+        Comparator<ConsumerGroupTargetAssignmentMemberValue.TopicPartition> 
comparator =
+            
Comparator.comparing(ConsumerGroupTargetAssignmentMemberValue.TopicPartition::topicId);

Review Comment:
   Should it sort the partition as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to