This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c2ee76d2af [SPARK-44274][CONNECT] Move out util functions used by 
ArtifactManager to common/utils
4c2ee76d2af is described below

commit 4c2ee76d2afa63a7a7c0334fd8f4763d3d87ddbb
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Mon Jul 3 20:05:00 2023 -0700

    [SPARK-44274][CONNECT] Move out util functions used by ArtifactManager to 
common/utils
    
    ### What changes were proposed in this pull request?
    
    Move out util functions used by ArtifactManager to `common/utils`. More 
specific, move `resolveURI` and `awaitResult` to `common/utils`.
    
    ### Why are the changes needed?
    
    So that Spark Connect Scala client does not need to depend on Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test
    
    Closes #41825 from amaliujia/SPARK-44273.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/util/SparkFatalException.scala    |  0
 .../org/apache/spark/util/SparkFileUtils.scala     | 47 +++++++++++++++++
 .../org/apache/spark/util/SparkThreadUtils.scala   | 60 ++++++++++++++++++++++
 .../spark/sql/connect/client/ArtifactManager.scala |  6 +--
 .../scala/org/apache/spark/util/ThreadUtils.scala  | 15 +-----
 .../main/scala/org/apache/spark/util/Utils.scala   | 17 +-----
 6 files changed, 112 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkFatalException.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkFatalException.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/util/SparkFatalException.scala
rename to 
common/utils/src/main/scala/org/apache/spark/util/SparkFatalException.scala
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
new file mode 100644
index 00000000000..63d1ab4799a
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.File
+import java.net.{URI, URISyntaxException}
+
+private[spark] object SparkFileUtils {
+  /**
+   * Return a well-formed URI for the file described by a user input string.
+   *
+   * If the supplied path does not contain a scheme, or is a relative path, it 
will be
+   * converted into an absolute path with a file:// scheme.
+   */
+  def resolveURI(path: String): URI = {
+    try {
+      val uri = new URI(path)
+      if (uri.getScheme() != null) {
+        return uri
+      }
+      // make sure to handle if the path has a fragment (applies to yarn
+      // distributed cache)
+      if (uri.getFragment() != null) {
+        val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
+        return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), 
absoluteURI.getPath(),
+          uri.getFragment())
+      }
+    } catch {
+      case e: URISyntaxException =>
+    }
+    new File(path).getCanonicalFile().toURI()
+  }
+}
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala
new file mode 100644
index 00000000000..ec14688a006
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.Awaitable
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkException
+
+private[spark] object SparkThreadUtils {
+  // scalastyle:off awaitresult
+  /**
+   * Preferred alternative to `Await.result()`.
+   *
+   * This method wraps and re-throws any exceptions thrown by the underlying 
`Await` call, ensuring
+   * that this thread's stack trace appears in logs.
+   *
+   * In addition, it calls `Awaitable.result` directly to avoid using 
`ForkJoinPool`'s
+   * `BlockingContext`. Codes running in the user's thread may be in a thread 
of Scala ForkJoinPool.
+   * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] 
value unexpectedly, this
+   * method basically prevents ForkJoinPool from running other tasks in the 
current waiting thread.
+   * In general, we should use this method because many places in Spark use 
[[ThreadLocal]] and it's
+   * hard to debug when [[ThreadLocal]]s leak to other tasks.
+   */
+  @throws(classOf[SparkException])
+  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+      // See SPARK-13747.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.result(atMost)(awaitPermission)
+    } catch {
+      case e: SparkFatalException =>
+        throw e.throwable
+      // TimeoutException and RpcAbortException is thrown in the current 
thread, so not need to warp
+      // the exception.
+      case NonFatal(t)
+        if !t.isInstanceOf[TimeoutException] =>
+        throw new SparkException("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitresult
+}
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 6d0d16df946..0ed1670f990 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -38,7 +38,7 @@ import org.apache.commons.codec.digest.DigestUtils.sha256Hex
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.AddArtifactsResponse
 import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
 
 /**
  * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
@@ -71,7 +71,7 @@ class ArtifactManager(
    * Currently only local files with extensions .jar and .class are supported.
    */
   def addArtifact(path: String): Unit = {
-    addArtifact(Utils.resolveURI(path))
+    addArtifact(SparkFileUtils.resolveURI(path))
   }
 
   private def parseArtifacts(uri: URI): Seq[Artifact] = {
@@ -201,7 +201,7 @@ class ArtifactManager(
       writeBatch()
     }
     stream.onCompleted()
-    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    SparkThreadUtils.awaitResult(promise.future, Duration.Inf)
     // TODO(SPARK-42658): Handle responses containing CRC failures.
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 303493ef91a..16d7de56c39 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -307,20 +307,7 @@ private[spark] object ThreadUtils {
    */
   @throws(classOf[SparkException])
   def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
-    try {
-      // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
-      // See SPARK-13747.
-      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
-      awaitable.result(atMost)(awaitPermission)
-    } catch {
-      case e: SparkFatalException =>
-        throw e.throwable
-      // TimeoutException and RpcAbortException is thrown in the current 
thread, so not need to warp
-      // the exception.
-      case NonFatal(t)
-          if !t.isInstanceOf[TimeoutException] =>
-        throw new SparkException("Exception thrown in awaitResult: ", t)
-    }
+    SparkThreadUtils.awaitResult(awaitable, atMost)
   }
   // scalastyle:on awaitresult
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 60895c791b5..b5c0ee1bab8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2085,22 +2085,7 @@ private[spark] object Utils extends Logging with 
SparkClassUtils {
    * converted into an absolute path with a file:// scheme.
    */
   def resolveURI(path: String): URI = {
-    try {
-      val uri = new URI(path)
-      if (uri.getScheme() != null) {
-        return uri
-      }
-      // make sure to handle if the path has a fragment (applies to yarn
-      // distributed cache)
-      if (uri.getFragment() != null) {
-        val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
-        return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), 
absoluteURI.getPath(),
-          uri.getFragment())
-      }
-    } catch {
-      case e: URISyntaxException =>
-    }
-    new File(path).getCanonicalFile().toURI()
+    SparkFileUtils.resolveURI(path)
   }
 
   /** Resolve a comma-separated list of paths. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to