This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f80de3923d KAFKA-19162: Topology metadata contains 
non-deterministically ordered topic configs (#19491)
5f80de3923d is described below

commit 5f80de3923de7a57ac30a8691ba3ff5fcbacbdb6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Apr 17 06:12:17 2025 +0200

    KAFKA-19162: Topology metadata contains non-deterministically ordered topic 
configs (#19491)
    
    Topology description sent to broker in KIP-1071 contains
    non-deterministically ordered topic configs.  Since the topology is
    compared to the groups topology upon joining we may run into
    `INVALID_REQUEST: Topology updates are not supported yet` failures if
    the topology sent by the  application does not match the group topology
    due to different topic config order.
    
    This PR ensures that topic configs are ordered, to avoid an
    `INVALID_REQUEST` error.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../internals/StreamsGroupHeartbeatRequestManager.java    |  2 ++
 .../StreamsGroupHeartbeatRequestManagerTest.java          | 15 +++++++++++++--
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 55741114b34..5012aba5a32 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -237,6 +237,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                     repartitionTopicInfo.topicConfigs().add(new 
StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
                 );
                 repartitionTopicsInfo.add(repartitionTopicInfo);
+                
repartitionTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
             }
             
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
             return repartitionTopicsInfo;
@@ -251,6 +252,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                 changelogTopic.getValue().topicConfigs().forEach((k, v) ->
                     changelogTopicInfo.topicConfigs().add(new 
StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
                 );
+                
changelogTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
                 changelogTopicsInfo.add(changelogTopicInfo);
             }
             
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index dae6958035b..d43bcfc7891 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -108,7 +108,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private static final String REPARTITION_SOURCE_TOPIC_1 = 
"repartitionSourceTopic1";
     private static final String REPARTITION_SOURCE_TOPIC_2 = 
"repartitionSourceTopic2";
     private static final Map<String, StreamsRebalanceData.TopicInfo> 
REPARTITION_SOURCE_TOPICS = Map.of(
-        REPARTITION_SOURCE_TOPIC_1, new 
StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), 
Map.of("config1", "value1")),
+        REPARTITION_SOURCE_TOPIC_1, new 
StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), 
Map.of("config3", "value3", "config1", "value1")),
         REPARTITION_SOURCE_TOPIC_2, new 
StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3), 
Collections.emptyMap())
     );
     private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
@@ -117,7 +117,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
     private static final Map<String, StreamsRebalanceData.TopicInfo> 
CHANGELOG_TOPICS = Map.of(
         CHANGELOG_TOPIC_1, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), 
Map.of()),
         CHANGELOG_TOPIC_2, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2), 
Map.of()),
-        CHANGELOG_TOPIC_3, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), 
Map.of("config2", "value2"))
+        CHANGELOG_TOPIC_3, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), 
Map.of("config4", "value4", "config2", "value2"))
     );
     private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(
         Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2),
@@ -664,6 +664,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             assertEquals(repartitionTopic.numPartitions().get(), 
topicInfo.partitions());
             assertEquals(repartitionTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
             assertEquals(repartitionTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+            assertTrue(isSorted(topicInfo.topicConfigs(), 
Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
         });
         assertEquals(CHANGELOG_TOPICS.size(), 
subtopology1.stateChangelogTopics().size());
         subtopology1.stateChangelogTopics().forEach(topicInfo -> {
@@ -672,6 +673,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             final StreamsRebalanceData.TopicInfo changelogTopic = 
CHANGELOG_TOPICS.get(topicInfo.name());
             assertEquals(changelogTopic.replicationFactor().get(), 
topicInfo.replicationFactor());
             assertEquals(changelogTopic.topicConfigs().size(), 
topicInfo.topicConfigs().size());
+            assertTrue(isSorted(topicInfo.topicConfigs(), 
Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
         });
         assertEquals(2, subtopology1.copartitionGroups().size());
         final StreamsGroupHeartbeatRequestData.CopartitionGroup 
expectedCopartitionGroupData1 =
@@ -701,6 +703,15 @@ class StreamsGroupHeartbeatRequestManagerTest {
         assertNull(nonJoiningRequestData.topology());
     }
 
+    private <V> boolean isSorted(List<V> collection, Comparator<V> comparator) 
{
+        for (int i = 1; i < collection.size(); i++) {
+            if (comparator.compare(collection.get(i - 1), collection.get(i)) > 
0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @ParameterizedTest
     @MethodSource("provideNonJoiningStates")
     public void 
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState 
memberState) {

Reply via email to