cmccabe commented on code in PR #14376: URL: https://github.com/apache/kafka/pull/14376#discussion_r1329294750
########## metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.metadata.RegisterControllerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + + +/** + * ListenerInfo contains information about the listeners of either a controller or a broker. + * ListenerInfo objects are immutable; they cannot be modified once created. The intention is + * that you store either controller listeners or broker listeners here, but not both. On a + * combined KRaft node, which has both broker and controller roles, you would have two + * separate ListenerInfo objects to represent the listeners of each role. + * + * Listener information is stored in a linked hash map. This maintains ordering while still + * allowing the traditional O(1) hash map access. By convention, the first listener is special, + * corresponding to either the inter-controller listener or the inter-broker listener. + * This is the only listener that other nodes will attempt to use to communicate with this node. + * + * You may wonder why nodes support multiple listeners, given that inter-cluster communication only + * ever uses the first one. Well, one reason is that external clients may wish to use the additional + * listeners. It is a good practice to separate external and internal traffic. In some cases, + * external traffic may be encrypted while internal traffic is not. (Although other admins may wish + * to encrypt everything.) Another reason is that supporting multiple listeners allows us to change + * the effective inter-cluster listener via a roll. During such a roll, half of the brokers + * (or controllers) might be using one listener, while the other half use another. This lets us, + * for example, transition from using a PLAINTEXT inter broker listener to using an SSL one without + * taking any downtime. + * + * The ListenerInfo class is intended to handle translating endpoint information between various + * different data structures, and also to handle the two big gotchas of Kafka endpoints. + * + * The first gotcha is that the hostname will be null or blank if we are listening on 0.0.0.0. + * The withWildcardHostnamesResolved function creates a ListenerInfo object where all such hostnames + * are replaced by specific hostnames. (It's not perfect because we have to choose a single hostname + * out of multiple possibilities. In production scenarios it would be better to set the desired + * hostname explicitly in the configuration rather than binding to 0.0.0.0.) + * + * The second gotcha is that if someone configures an ephemeral port (aka port 0), we need to fill + * The withEphemeralPortsCorrected resolves this by filling in the missing information for ephemeral + * ports. + */ +final public class ListenerInfo { + /** + * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRequest( + ControllerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterControllerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromControllerRegistrationRecord( + RegisterControllerRecord.ControllerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC. + * + * @param collection The RPC data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRequest( + BrokerRegistrationRequestData.ListenerCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + /** + * Create a ListenerInfo from data in a RegisterBrokerRecord. + * + * @param collection The record data. + * + * @return The ListenerInfo object. + */ + public static ListenerInfo fromBrokerRegistrationRecord( + RegisterBrokerRecord.BrokerEndpointCollection collection + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + collection.forEach(listener -> { + SecurityProtocol protocol = SecurityProtocol.forId(listener.securityProtocol()); + if (protocol == null) { + throw new RuntimeException("Unknown security protocol " + + (int) listener.securityProtocol() + " in listener " + listener.name()); + } + listeners.put(listener.name(), new Endpoint(listener.name(), + protocol, + listener.host(), + listener.port())); + }); + return new ListenerInfo(listeners); + } + + public static ListenerInfo create( + List<Endpoint> rawListeners + ) { + return create(Optional.empty(), rawListeners); + } + + public static ListenerInfo create( + Optional<String> firstListenerName, + List<Endpoint> rawListeners + ) { + LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>(); + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + for (Endpoint listener : rawListeners) { + String name = listener.listenerName().get(); + if (!Optional.of(name).equals(firstListenerName)) { + listeners.put(name, listener); + } + } + return new ListenerInfo(listeners); + } + + /** + * An ordered map containing all of the listeners. The first listener is special, indicating + * either the inter-broker or inter-controller listener. + */ + private final Map<String, Endpoint> listeners; + + private ListenerInfo(Map<String, Endpoint> listeners) { + this.listeners = Collections.unmodifiableMap(listeners); + } + + public Map<String, Endpoint> listeners() { + return listeners; + } + + public Endpoint firstListener() { + return listeners.values().iterator().next(); Review Comment: In general the cases where we have no listeners in here are somewhat degenerate (unit tests with garbage registrations, etc.) and we don't call firstListener in `QuorumController` code anyway. However, I'll add a check and a `throw` to be more explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org