This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 88ec4d0d60b KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay (#12681) 88ec4d0d60b is described below commit 88ec4d0d60bcfd48e8ac7dcfe6072d0bf8538b23 Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Fri Sep 23 15:39:50 2022 -0700 KAFKA-14259: BrokerRegistration#toString throws an exception, terminating metadata replay (#12681) Previously, BrokerRegistration#toString sould throw an exception, terminating metadata replay, because the sorted() method is used on an entry set rather than a key set. Reviewers: David Arthur <mum...@gmail.com> --- .../java/org/apache/kafka/metadata/BrokerRegistration.java | 4 ++-- .../org/apache/kafka/metadata/BrokerRegistrationTest.java | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java index d1d34550653..455eddb4403 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java @@ -223,8 +223,8 @@ public class BrokerRegistration { map(n -> listeners.get(n).toString()). collect(Collectors.joining(", "))); bld.append("], supportedFeatures={").append( - supportedFeatures.entrySet().stream().sorted(). - map(e -> e.getKey() + ": " + e.getValue()). + supportedFeatures.keySet().stream().sorted(). + map(k -> k + ": " + supportedFeatures.get(k)). collect(Collectors.joining(", "))); bld.append("}"); bld.append(", rack=").append(rack); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java index 10d1169412c..b08d98cadb7 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java @@ -27,10 +27,13 @@ import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.AbstractMap.SimpleEntry; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -48,7 +51,9 @@ public class BrokerRegistrationTest { Optional.empty(), true, false), new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"), Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)), - Collections.singletonMap("foo", VersionRange.of((short) 2, (short) 3)), + Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)), + new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 4))).collect( + Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)), Optional.of("myrack"), false, true)); @Test @@ -77,6 +82,12 @@ public class BrokerRegistrationTest { "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " + "rack=Optional.empty, fenced=true, inControlledShutdown=false)", REGISTRATIONS.get(1).toString()); + assertEquals("BrokerRegistration(id=2, epoch=0, " + + "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" + + "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " + + "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " + + "rack=Optional[myrack], fenced=false, inControlledShutdown=true)", + REGISTRATIONS.get(2).toString()); } @Test