This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 61c53cd702e [SPARK-42173][CORE] RpcAddress equality can fail
61c53cd702e is described below

commit 61c53cd702e51affa4436d77b6aaf7fc25cb7808
Author: Holden Karau <hka...@netflix.com>
AuthorDate: Thu Jan 26 09:12:48 2023 -0800

    [SPARK-42173][CORE] RpcAddress equality can fail
    
    ### What changes were proposed in this pull request?
    
    When constructing an RpcAddress use InetUtils to get a consistently 
formatted IPv6 address if the env is for an IPv6 address.
    
    ### Why are the changes needed?
    
    We use RpcAddress equality for various tasks involving executors and a 
mismatch of equality can cause interesting errors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Log messages might change from sometimes having all the 0s in a v6 address 
present to not.
    
    ### How was this patch tested?
    
    Existing tests + new unit test showing that [::0:1] is formatted to [::1]
    
    Closes #39728 from holdenk/SPARK-42173-ipv6-sparse.
    
    Authored-by: Holden Karau <hka...@netflix.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit e30bb538e480940b1963eb14c3267662912d8584)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../main/scala/org/apache/spark/rpc/RpcAddress.scala   | 15 ++++++++++-----
 core/src/main/scala/org/apache/spark/util/Utils.scala  | 18 ++++++++++++++++++
 .../scala/org/apache/spark/rpc/RpcAddressSuite.scala   | 10 ++++++++++
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
index 675dc24206a..1fa22451e5d 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
@@ -23,9 +23,7 @@ import org.apache.spark.util.Utils
 /**
  * Address for an RPC environment, with hostname and port.
  */
-private[spark] case class RpcAddress(_host: String, port: Int) {
-
-  lazy val host: String = Utils.addBracketsIfNeeded(_host)
+private[spark] case class RpcAddress(host: String, port: Int) {
 
   def hostPort: String = host + ":" + port
 
@@ -38,15 +36,22 @@ private[spark] case class RpcAddress(_host: String, port: 
Int) {
 
 private[spark] object RpcAddress {
 
+  def apply(host: String, port: Int): RpcAddress = {
+    new RpcAddress(
+      Utils.normalizeIpIfNeeded(host),
+      port
+    )
+  }
+
   /** Return the [[RpcAddress]] represented by `uri`. */
   def fromUrlString(uri: String): RpcAddress = {
     val uriObj = new java.net.URI(uri)
-    RpcAddress(uriObj.getHost, uriObj.getPort)
+    apply(uriObj.getHost, uriObj.getPort)
   }
 
   /** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */
   def fromSparkURL(sparkUrl: String): RpcAddress = {
     val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
-    RpcAddress(host, port)
+    apply(host, port)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fb073595147..9bf45ed3776 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1109,6 +1109,24 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Normalize IPv6 IPs and no-op on all other hosts.
+   */
+  private[spark] def normalizeIpIfNeeded(host: String): String = {
+    // Is this a v6 address. We ask users to add [] around v6 addresses as 
strs but
+    // there not always there. If it's just 0-9 and : and [] we treat it as a 
v6 address.
+    // This means some invalid addresses are treated as v6 addresses, but 
since they are
+    // not valid hostnames it doesn't matter.
+    // See https://www.rfc-editor.org/rfc/rfc1123#page-13 for context around 
valid hostnames.
+    val addressRe = """^\[{0,1}([0-9:]+?:[0-9]*)\]{0,1}$""".r
+    host match {
+      case addressRe(unbracketed) =>
+        
addBracketsIfNeeded(InetAddresses.toAddrString(InetAddresses.forString(unbracketed)))
+      case _ =>
+        host
+    }
+  }
+
   /**
    * Checks if the host contains only valid hostname/ip without port
    * NOTE: Incase of IPV6 ip it should be enclosed inside []
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
index 0f7c9d71330..9fb08c79420 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
@@ -70,4 +70,14 @@ class RpcAddressSuite extends SparkFunSuite {
     val address = RpcAddress("::1", 1234)
     assert(address.toSparkURL == "spark://[::1]:1234")
   }
+
+  test("SPARK-42173: Consistent Sparse Mapping") {
+    val address = RpcAddress("::0:1", 1234)
+    assert(address.toSparkURL == "spark://[::1]:1234")
+  }
+
+  test("SPARK-42173: Consistent Sparse Mapping trailing 0s") {
+    val address = RpcAddress("2600::", 1234)
+    assert(address.toSparkURL == "spark://[2600::]:1234")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to