This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab117b5f292f [SPARK-47092][CORE][SQL][K8S] Add `getUriBuilder` to 
`o.a.s.u.Utils` and use it
ab117b5f292f is described below

commit ab117b5f292fa964d2c73ebeed762acf744762b6
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Feb 19 15:42:43 2024 -0800

    [SPARK-47092][CORE][SQL][K8S] Add `getUriBuilder` to `o.a.s.u.Utils` and 
use it
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `getUriBuilder` to `o.a.s.u.Utils` and use it.
    
    ### Why are the changes needed?
    
    This PR aims to introduce a layer into Spark code for `UriBuilder` API.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45168 from dongjoon-hyun/SPARK-47092.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala   |  5 ++---
 .../main/scala/org/apache/spark/deploy/SparkSubmit.scala  |  5 ++---
 .../main/scala/org/apache/spark/executor/Executor.scala   |  3 +--
 core/src/main/scala/org/apache/spark/util/Utils.scala     | 15 +++++++++++++++
 .../deploy/k8s/features/BasicDriverFeatureStep.scala      |  6 ++----
 scalastyle-config.xml                                     |  5 +++++
 .../org/apache/spark/sql/artifact/ArtifactManager.scala   |  3 +--
 .../org/apache/spark/sql/execution/SparkSqlParser.scala   |  4 ++--
 8 files changed, 30 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index da37fa83254b..801b6dd85a2b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,7 +22,6 @@ import java.net.URI
 import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
-import javax.ws.rs.core.UriBuilder
 
 import scala.collection.Map
 import scala.collection.concurrent.{Map => ScalaConcurrentMap}
@@ -1829,12 +1828,12 @@ class SparkContext(config: SparkConf) extends Logging {
         addedArchives
           .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, 
Long]().asScala)
           .putIfAbsent(
-          UriBuilder.fromUri(new 
URI(key)).fragment(uri.getFragment).build().toString,
+          Utils.getUriBuilder(new 
URI(key)).fragment(uri.getFragment).build().toString,
           timestamp).isEmpty) {
       logInfo(s"Added archive $path at $key with timestamp $timestamp")
       // If the scheme is file, use URI to simply copy instead of downloading.
       val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
-      val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build()
+      val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build()
       val source = Utils.fetchFile(uriToDownload.toString, 
Utils.createTempDir(), conf,
         hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
       val dest = new File(
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 5e7dc799ab07..10c1dbe2054a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -24,7 +24,6 @@ import java.nio.file.Files
 import java.security.PrivilegedExceptionAction
 import java.util.ServiceLoader
 import java.util.jar.JarInputStream
-import javax.ws.rs.core.UriBuilder
 
 import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
@@ -409,7 +408,7 @@ private[spark] class SparkSubmit extends Logging {
           val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
           val localResources = downloadFileList(
             resolvedUris.map(
-              
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
+              
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
             targetDir, sparkConf, hadoopConf)
           
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
             case (localResources, resolvedUri) =>
@@ -426,7 +425,7 @@ private[spark] class SparkSubmit extends Logging {
                 Files.copy(source.toPath, dest.toPath)
               }
               // Keep the URIs of local files with the given fragments.
-              UriBuilder.fromUri(
+              Utils.getUriBuilder(
                 
localResources).fragment(resolvedUri.getFragment).build().toString
           }.mkString(",")
         }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dae00a72285d..206b293e08c2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -27,7 +27,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import javax.annotation.concurrent.GuardedBy
-import javax.ws.rs.core.UriBuilder
 
 import scala.collection.immutable
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -1157,7 +1156,7 @@ private[spark] class Executor(
           state.currentArchives.getOrElse(name, -1L) < timestamp) {
         logInfo(s"Fetching $name with timestamp $timestamp")
         val sourceURI = new URI(name)
-        val uriToDownload = 
UriBuilder.fromUri(sourceURI).fragment(null).build()
+        val uriToDownload = 
Utils.getUriBuilder(sourceURI).fragment(null).build()
         val source = Utils.fetchFile(uriToDownload.toString, 
Utils.createTempDir(), conf,
           hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
         val dest = new File(
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b49f97aed05e..25b03a2c5d6d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -32,6 +32,7 @@ import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
 import java.util.concurrent.TimeUnit.NANOSECONDS
 import java.util.zip.{GZIPInputStream, ZipInputStream}
+import javax.ws.rs.core.UriBuilder
 
 import scala.annotation.tailrec
 import scala.collection.Map
@@ -2885,6 +2886,20 @@ private[spark] object Utils
     uri.startsWith(s"$LOCAL_SCHEME:")
   }
 
+  /** Create a UriBuilder from URI object. */
+  def getUriBuilder(uri: URI): UriBuilder = {
+    // scalastyle:off uribuilder
+    UriBuilder.fromUri(uri)
+    // scalastyle:on uribuilder
+  }
+
+  /** Create a UriBuilder from URI string. */
+  def getUriBuilder(uri: String): UriBuilder = {
+    // scalastyle:off uribuilder
+    UriBuilder.fromUri(uri)
+    // scalastyle:on uribuilder
+  }
+
   /** Check whether the file of the path is splittable. */
   def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): 
Boolean = {
     val codec = codecFactory.getCodec(path)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 31dd029c27fb..51ee9ffbe405 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import javax.ws.rs.core.UriBuilder
-
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -171,7 +169,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
         conf.get(key).partition(uri => 
KubernetesUtils.isLocalAndResolvable(uri))
       val value = {
         if (key == ARCHIVES) {
-          
localUris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString)
+          
localUris.map(Utils.getUriBuilder(_).fragment(null).build()).map(_.toString)
         } else {
           localUris
         }
@@ -180,7 +178,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       if (resolved.nonEmpty) {
         val resolvedValue = if (key == ARCHIVES) {
           localUris.zip(resolved).map { case (uri, r) =>
-            UriBuilder.fromUri(r).fragment(new 
java.net.URI(uri).getFragment).build().toString
+            Utils.getUriBuilder(r).fragment(new 
java.net.URI(uri).getFragment).build().toString
           }
         } else {
           resolved
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index b38c6d1c1fa6..fb1d17dca0f5 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -283,6 +283,11 @@ This file is divided into 3 sections:
     of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
   </check>
 
+  <check customId="uribuilder" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter 
name="regex">UriBuilder\.fromUri</parameter></parameters>
+    <customMessage>Use Utils.getUriBuilder instead.</customMessage>
+  </check>
+
   <check customId="executioncontextglobal" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
     <parameters><parameter 
name="regex">scala\.concurrent\.ExecutionContext\.Implicits\.global</parameter></parameters>
     <customMessage> User queries can use global thread pool, causing 
starvation and eventual OOM.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 47f5dbdac488..9e4b6ef31636 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.net.{URI, URL, URLClassLoader}
 import java.nio.file.{Files, Path, Paths, StandardCopyOption}
 import java.util.concurrent.CopyOnWriteArrayList
-import javax.ws.rs.core.UriBuilder
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
@@ -174,7 +173,7 @@ class ArtifactManager(session: SparkSession) extends 
Logging {
         }
       } else if (remoteRelativePath.startsWith(s"archives${File.separator}")) {
         val canonicalUri =
-          fragment.map(UriBuilder.fromUri(new 
URI(uri)).fragment).getOrElse(new URI(uri))
+          fragment.map(Utils.getUriBuilder(new 
URI(uri)).fragment).getOrElse(new URI(uri))
         session.sparkContext.addArchive(canonicalUri.toString)
       } else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
         session.sparkContext.addFile(uri)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 2ef68887e87c..79ddfc7d31da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
 
 import java.time.ZoneOffset
 import java.util.{Locale, TimeZone}
-import javax.ws.rs.core.UriBuilder
 
 import scala.jdk.CollectionConverters._
 
@@ -42,6 +41,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.Utils.getUriBuilder
 
 /**
  * Concrete parser for Spark SQL statements.
@@ -862,7 +862,7 @@ class SparkSqlAstBuilder extends AstBuilder {
           throw QueryParsingErrors.unsupportedLocalFileSchemeError(ctx, 
pathScheme)
         case _ =>
           // force scheme to be file rather than fs.default.name
-          val loc = 
Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build())
+          val loc = 
Some(getUriBuilder(CatalogUtils.stringToURI(path)).scheme("file").build())
           storage = storage.copy(locationUri = loc)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to