mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928815976


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must 
have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: 
$listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and 
the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   So I think there is a bit of confusion here
   
   > Should we throw exception when the host is invalid?
   
   Initially when you said this I thought you were talking about if the host 
actually exists (i.e. making a connection to a host to see if machine is alive) 
rather than to just see if the host is valid in terms of formatting
   
   > In that case, should we do this validation? 
inetAddressValidator.isValid(ep.host). It looks like we don't care about if the 
host is valid or not, all we want to filter out, is the null host case (i.e. 
PLAINTEXT://:9096), is my understanding correct?
   
   So the main reason behind `inetAddressValidator.isValid(ep.host)` is 
actually to filter out valid IP addresses from everything else (because we do 
the duplicate port checking for IPv4 vs IPv6 it only makes sense on IP 
addresses). The host part is actually completely irrelevant and the general 
intention about the code is to only care about the valid IP part (hence why we 
are partitioning on valid IP's), whether there are null or valid hosts is 
irrelevant (also because the code beforehand didn't do any checking of hosts 
either, it only checked ports and whether they are duplicate).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to