This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 61c53cd702e [SPARK-42173][CORE] RpcAddress equality can fail 61c53cd702e is described below commit 61c53cd702e51affa4436d77b6aaf7fc25cb7808 Author: Holden Karau <hka...@netflix.com> AuthorDate: Thu Jan 26 09:12:48 2023 -0800 [SPARK-42173][CORE] RpcAddress equality can fail ### What changes were proposed in this pull request? When constructing an RpcAddress use InetUtils to get a consistently formatted IPv6 address if the env is for an IPv6 address. ### Why are the changes needed? We use RpcAddress equality for various tasks involving executors and a mismatch of equality can cause interesting errors. ### Does this PR introduce _any_ user-facing change? Log messages might change from sometimes having all the 0s in a v6 address present to not. ### How was this patch tested? Existing tests + new unit test showing that [::0:1] is formatted to [::1] Closes #39728 from holdenk/SPARK-42173-ipv6-sparse. Authored-by: Holden Karau <hka...@netflix.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit e30bb538e480940b1963eb14c3267662912d8584) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../main/scala/org/apache/spark/rpc/RpcAddress.scala | 15 ++++++++++----- core/src/main/scala/org/apache/spark/util/Utils.scala | 18 ++++++++++++++++++ .../scala/org/apache/spark/rpc/RpcAddressSuite.scala | 10 ++++++++++ 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala index 675dc24206a..1fa22451e5d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala @@ -23,9 +23,7 @@ import org.apache.spark.util.Utils /** * Address for an RPC environment, with hostname and port. */ -private[spark] case class RpcAddress(_host: String, port: Int) { - - lazy val host: String = Utils.addBracketsIfNeeded(_host) +private[spark] case class RpcAddress(host: String, port: Int) { def hostPort: String = host + ":" + port @@ -38,15 +36,22 @@ private[spark] case class RpcAddress(_host: String, port: Int) { private[spark] object RpcAddress { + def apply(host: String, port: Int): RpcAddress = { + new RpcAddress( + Utils.normalizeIpIfNeeded(host), + port + ) + } + /** Return the [[RpcAddress]] represented by `uri`. */ def fromUrlString(uri: String): RpcAddress = { val uriObj = new java.net.URI(uri) - RpcAddress(uriObj.getHost, uriObj.getPort) + apply(uriObj.getHost, uriObj.getPort) } /** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */ def fromSparkURL(sparkUrl: String): RpcAddress = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - RpcAddress(host, port) + apply(host, port) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fb073595147..9bf45ed3776 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1109,6 +1109,24 @@ private[spark] object Utils extends Logging { } } + /** + * Normalize IPv6 IPs and no-op on all other hosts. + */ + private[spark] def normalizeIpIfNeeded(host: String): String = { + // Is this a v6 address. We ask users to add [] around v6 addresses as strs but + // there not always there. If it's just 0-9 and : and [] we treat it as a v6 address. + // This means some invalid addresses are treated as v6 addresses, but since they are + // not valid hostnames it doesn't matter. + // See https://www.rfc-editor.org/rfc/rfc1123#page-13 for context around valid hostnames. + val addressRe = """^\[{0,1}([0-9:]+?:[0-9]*)\]{0,1}$""".r + host match { + case addressRe(unbracketed) => + addBracketsIfNeeded(InetAddresses.toAddrString(InetAddresses.forString(unbracketed))) + case _ => + host + } + } + /** * Checks if the host contains only valid hostname/ip without port * NOTE: Incase of IPV6 ip it should be enclosed inside [] diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala index 0f7c9d71330..9fb08c79420 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala @@ -70,4 +70,14 @@ class RpcAddressSuite extends SparkFunSuite { val address = RpcAddress("::1", 1234) assert(address.toSparkURL == "spark://[::1]:1234") } + + test("SPARK-42173: Consistent Sparse Mapping") { + val address = RpcAddress("::0:1", 1234) + assert(address.toSparkURL == "spark://[::1]:1234") + } + + test("SPARK-42173: Consistent Sparse Mapping trailing 0s") { + val address = RpcAddress("2600::", 1234) + assert(address.toSparkURL == "spark://[2600::]:1234") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org