This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f80de3923d KAFKA-19162: Topology metadata contains
non-deterministically ordered topic configs (#19491)
5f80de3923d is described below
commit 5f80de3923de7a57ac30a8691ba3ff5fcbacbdb6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Apr 17 06:12:17 2025 +0200
KAFKA-19162: Topology metadata contains non-deterministically ordered topic
configs (#19491)
Topology description sent to broker in KIP-1071 contains
non-deterministically ordered topic configs. Since the topology is
compared to the groups topology upon joining we may run into
`INVALID_REQUEST: Topology updates are not supported yet` failures if
the topology sent by the application does not match the group topology
due to different topic config order.
This PR ensures that topic configs are ordered, to avoid an
`INVALID_REQUEST` error.
Reviewers: Matthias J. Sax <[email protected]>
---
.../internals/StreamsGroupHeartbeatRequestManager.java | 2 ++
.../StreamsGroupHeartbeatRequestManagerTest.java | 15 +++++++++++++--
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 55741114b34..5012aba5a32 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -237,6 +237,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
repartitionTopicInfo.topicConfigs().add(new
StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
);
repartitionTopicsInfo.add(repartitionTopicInfo);
+
repartitionTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
}
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
return repartitionTopicsInfo;
@@ -251,6 +252,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
changelogTopic.getValue().topicConfigs().forEach((k, v) ->
changelogTopicInfo.topicConfigs().add(new
StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
);
+
changelogTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
changelogTopicsInfo.add(changelogTopicInfo);
}
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index dae6958035b..d43bcfc7891 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -108,7 +108,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private static final String REPARTITION_SOURCE_TOPIC_1 =
"repartitionSourceTopic1";
private static final String REPARTITION_SOURCE_TOPIC_2 =
"repartitionSourceTopic2";
private static final Map<String, StreamsRebalanceData.TopicInfo>
REPARTITION_SOURCE_TOPICS = Map.of(
- REPARTITION_SOURCE_TOPIC_1, new
StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1),
Map.of("config1", "value1")),
+ REPARTITION_SOURCE_TOPIC_1, new
StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1),
Map.of("config3", "value3", "config1", "value1")),
REPARTITION_SOURCE_TOPIC_2, new
StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3),
Collections.emptyMap())
);
private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
@@ -117,7 +117,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
private static final Map<String, StreamsRebalanceData.TopicInfo>
CHANGELOG_TOPICS = Map.of(
CHANGELOG_TOPIC_1, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1),
Map.of()),
CHANGELOG_TOPIC_2, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2),
Map.of()),
- CHANGELOG_TOPIC_3, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3),
Map.of("config2", "value2"))
+ CHANGELOG_TOPIC_3, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3),
Map.of("config4", "value4", "config2", "value2"))
);
private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(
Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2),
@@ -664,6 +664,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
assertEquals(repartitionTopic.numPartitions().get(),
topicInfo.partitions());
assertEquals(repartitionTopic.replicationFactor().get(),
topicInfo.replicationFactor());
assertEquals(repartitionTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ assertTrue(isSorted(topicInfo.topicConfigs(),
Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
});
assertEquals(CHANGELOG_TOPICS.size(),
subtopology1.stateChangelogTopics().size());
subtopology1.stateChangelogTopics().forEach(topicInfo -> {
@@ -672,6 +673,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
final StreamsRebalanceData.TopicInfo changelogTopic =
CHANGELOG_TOPICS.get(topicInfo.name());
assertEquals(changelogTopic.replicationFactor().get(),
topicInfo.replicationFactor());
assertEquals(changelogTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ assertTrue(isSorted(topicInfo.topicConfigs(),
Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
});
assertEquals(2, subtopology1.copartitionGroups().size());
final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData1 =
@@ -701,6 +703,15 @@ class StreamsGroupHeartbeatRequestManagerTest {
assertNull(nonJoiningRequestData.topology());
}
+ private <V> boolean isSorted(List<V> collection, Comparator<V> comparator)
{
+ for (int i = 1; i < collection.size(); i++) {
+ if (comparator.compare(collection.get(i - 1), collection.get(i)) >
0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@ParameterizedTest
@MethodSource("provideNonJoiningStates")
public void
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState
memberState) {