This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4c2ee76d2af [SPARK-44274][CONNECT] Move out util functions used by ArtifactManager to common/utils 4c2ee76d2af is described below commit 4c2ee76d2afa63a7a7c0334fd8f4763d3d87ddbb Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Mon Jul 3 20:05:00 2023 -0700 [SPARK-44274][CONNECT] Move out util functions used by ArtifactManager to common/utils ### What changes were proposed in this pull request? Move out util functions used by ArtifactManager to `common/utils`. More specific, move `resolveURI` and `awaitResult` to `common/utils`. ### Why are the changes needed? So that Spark Connect Scala client does not need to depend on Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test Closes #41825 from amaliujia/SPARK-44273. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/util/SparkFatalException.scala | 0 .../org/apache/spark/util/SparkFileUtils.scala | 47 +++++++++++++++++ .../org/apache/spark/util/SparkThreadUtils.scala | 60 ++++++++++++++++++++++ .../spark/sql/connect/client/ArtifactManager.scala | 6 +-- .../scala/org/apache/spark/util/ThreadUtils.scala | 15 +----- .../main/scala/org/apache/spark/util/Utils.scala | 17 +----- 6 files changed, 112 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SparkFatalException.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkFatalException.scala similarity index 100% rename from core/src/main/scala/org/apache/spark/util/SparkFatalException.scala rename to common/utils/src/main/scala/org/apache/spark/util/SparkFatalException.scala diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala new file mode 100644 index 00000000000..63d1ab4799a --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.File +import java.net.{URI, URISyntaxException} + +private[spark] object SparkFileUtils { + /** + * Return a well-formed URI for the file described by a user input string. + * + * If the supplied path does not contain a scheme, or is a relative path, it will be + * converted into an absolute path with a file:// scheme. + */ + def resolveURI(path: String): URI = { + try { + val uri = new URI(path) + if (uri.getScheme() != null) { + return uri + } + // make sure to handle if the path has a fragment (applies to yarn + // distributed cache) + if (uri.getFragment() != null) { + val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI() + return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(), + uri.getFragment()) + } + } catch { + case e: URISyntaxException => + } + new File(path).getCanonicalFile().toURI() + } +} diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala new file mode 100644 index 00000000000..ec14688a006 --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkThreadUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.util.concurrent.TimeoutException + +import scala.concurrent.Awaitable +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import org.apache.spark.SparkException + +private[spark] object SparkThreadUtils { + // scalastyle:off awaitresult + /** + * Preferred alternative to `Await.result()`. + * + * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring + * that this thread's stack trace appears in logs. + * + * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s + * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool. + * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this + * method basically prevents ForkJoinPool from running other tasks in the current waiting thread. + * In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's + * hard to debug when [[ThreadLocal]]s leak to other tasks. + */ + @throws(classOf[SparkException]) + def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = { + try { + // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. + // See SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + awaitable.result(atMost)(awaitPermission) + } catch { + case e: SparkFatalException => + throw e.throwable + // TimeoutException and RpcAbortException is thrown in the current thread, so not need to warp + // the exception. + case NonFatal(t) + if !t.isInstanceOf[TimeoutException] => + throw new SparkException("Exception thrown in awaitResult: ", t) + } + } + // scalastyle:on awaitresult +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index 6d0d16df946..0ed1670f990 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -38,7 +38,7 @@ import org.apache.commons.codec.digest.DigestUtils.sha256Hex import org.apache.spark.connect.proto import org.apache.spark.connect.proto.AddArtifactsResponse import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils} /** * The Artifact Manager is responsible for handling and transferring artifacts from the local @@ -71,7 +71,7 @@ class ArtifactManager( * Currently only local files with extensions .jar and .class are supported. */ def addArtifact(path: String): Unit = { - addArtifact(Utils.resolveURI(path)) + addArtifact(SparkFileUtils.resolveURI(path)) } private def parseArtifacts(uri: URI): Seq[Artifact] = { @@ -201,7 +201,7 @@ class ArtifactManager( writeBatch() } stream.onCompleted() - ThreadUtils.awaitResult(promise.future, Duration.Inf) + SparkThreadUtils.awaitResult(promise.future, Duration.Inf) // TODO(SPARK-42658): Handle responses containing CRC failures. } diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 303493ef91a..16d7de56c39 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -307,20 +307,7 @@ private[spark] object ThreadUtils { */ @throws(classOf[SparkException]) def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = { - try { - // `awaitPermission` is not actually used anywhere so it's safe to pass in null here. - // See SPARK-13747. - val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] - awaitable.result(atMost)(awaitPermission) - } catch { - case e: SparkFatalException => - throw e.throwable - // TimeoutException and RpcAbortException is thrown in the current thread, so not need to warp - // the exception. - case NonFatal(t) - if !t.isInstanceOf[TimeoutException] => - throw new SparkException("Exception thrown in awaitResult: ", t) - } + SparkThreadUtils.awaitResult(awaitable, atMost) } // scalastyle:on awaitresult diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 60895c791b5..b5c0ee1bab8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2085,22 +2085,7 @@ private[spark] object Utils extends Logging with SparkClassUtils { * converted into an absolute path with a file:// scheme. */ def resolveURI(path: String): URI = { - try { - val uri = new URI(path) - if (uri.getScheme() != null) { - return uri - } - // make sure to handle if the path has a fragment (applies to yarn - // distributed cache) - if (uri.getFragment() != null) { - val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI() - return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(), - uri.getFragment()) - } - } catch { - case e: URISyntaxException => - } - new File(path).getCanonicalFile().toURI() + SparkFileUtils.resolveURI(path) } /** Resolve a comma-separated list of paths. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org