cmccabe commented on code in PR #14376:
URL: https://github.com/apache/kafka/pull/14376#discussion_r1329294750


##########
metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterControllerRecord;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+
+/**
+ * ListenerInfo contains information about the listeners of either a 
controller or a broker.
+ * ListenerInfo objects are immutable; they cannot be modified once created. 
The intention is
+ * that you store either controller listeners or broker listeners here, but 
not both. On a
+ * combined KRaft node, which has both broker and controller roles, you would 
have two
+ * separate ListenerInfo objects to represent the listeners of each role.
+ *
+ * Listener information is stored in a linked hash map. This maintains 
ordering while still
+ * allowing the traditional O(1) hash map access. By convention, the first 
listener is special,
+ * corresponding to either the inter-controller listener or the inter-broker 
listener.
+ * This is the only listener that other nodes will attempt to use to 
communicate with this node.
+ *
+ * You may wonder why nodes support multiple listeners, given that 
inter-cluster communication only
+ * ever uses the first one. Well, one reason is that external clients may wish 
to use the additional
+ * listeners. It is a good practice to separate external and internal traffic. 
In some cases,
+ * external traffic may be encrypted while internal traffic is not. (Although 
other admins may wish
+ * to encrypt everything.) Another reason is that supporting multiple 
listeners allows us to change
+ * the effective inter-cluster listener via a roll. During such a roll, half 
of the brokers
+ * (or controllers) might be using one listener, while the other half use 
another. This lets us,
+ * for example, transition from using a PLAINTEXT inter broker listener to 
using an SSL one without
+ * taking any downtime.
+ *
+ * The ListenerInfo class is intended to handle translating endpoint 
information between various
+ * different data structures, and also to handle the two big gotchas of Kafka 
endpoints.
+ *
+ * The first gotcha is that the hostname will be null or blank if we are 
listening on 0.0.0.0.
+ * The withWildcardHostnamesResolved function creates a ListenerInfo object 
where all such hostnames
+ * are replaced by specific hostnames. (It's not perfect because we have to 
choose a single hostname
+ * out of multiple possibilities. In production scenarios it would be better 
to set the desired
+ * hostname explicitly in the configuration rather than binding to 0.0.0.0.)
+ *
+ * The second gotcha is that if someone configures an ephemeral port (aka port 
0), we need to fill
+ * The withEphemeralPortsCorrected resolves this by filling in the missing 
information for ephemeral
+ * ports.
+ */
+final public class ListenerInfo {
+    /**
+     * Create a ListenerInfo from data in a ControllerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRequest(
+        ControllerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterControllerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromControllerRegistrationRecord(
+        RegisterControllerRecord.ControllerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                    (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a BrokerRegistrationRequest RPC.
+     *
+     * @param collection    The RPC data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRequest(
+        BrokerRegistrationRequestData.ListenerCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                protocol,
+                listener.host(),
+                listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * Create a ListenerInfo from data in a RegisterBrokerRecord.
+     *
+     * @param collection    The record data.
+     *
+     * @return              The ListenerInfo object.
+     */
+    public static ListenerInfo fromBrokerRegistrationRecord(
+        RegisterBrokerRecord.BrokerEndpointCollection collection
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        collection.forEach(listener -> {
+            SecurityProtocol protocol = 
SecurityProtocol.forId(listener.securityProtocol());
+            if (protocol == null) {
+                throw new RuntimeException("Unknown security protocol " +
+                        (int) listener.securityProtocol() + " in listener " + 
listener.name());
+            }
+            listeners.put(listener.name(), new Endpoint(listener.name(),
+                    protocol,
+                    listener.host(),
+                    listener.port()));
+        });
+        return new ListenerInfo(listeners);
+    }
+
+    public static ListenerInfo create(
+        List<Endpoint> rawListeners
+    ) {
+        return create(Optional.empty(), rawListeners);
+    }
+
+    public static ListenerInfo create(
+        Optional<String> firstListenerName,
+        List<Endpoint> rawListeners
+    ) {
+        LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        for (Endpoint listener : rawListeners) {
+            String name = listener.listenerName().get();
+            if (!Optional.of(name).equals(firstListenerName)) {
+                listeners.put(name, listener);
+            }
+        }
+        return new ListenerInfo(listeners);
+    }
+
+    /**
+     * An ordered map containing all of the listeners. The first listener is 
special, indicating
+     * either the inter-broker or inter-controller listener.
+     */
+    private final Map<String, Endpoint> listeners;
+
+    private ListenerInfo(Map<String, Endpoint> listeners) {
+        this.listeners = Collections.unmodifiableMap(listeners);
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Endpoint firstListener() {
+        return listeners.values().iterator().next();

Review Comment:
   In general the cases where we have no listeners in here are somewhat 
degenerate (unit tests with garbage registrations, etc.) and we don't call 
firstListener in `QuorumController` code anyway.
   
   However, I'll add a check and a `throw` to be more explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to