vicennial commented on code in PR #52073:
URL: https://github.com/apache/spark/pull/52073#discussion_r2300269512
##########
sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala:
##########
@@ -266,28 +267,41 @@ class ArtifactManager(session: SparkSession) extends
AutoCloseable with Logging
* they are from a permanent location.
*/
private[sql] def addLocalArtifacts(artifacts: Seq[Artifact]): Unit = {
+ val failedArtifactExceptions = ListBuffer[SparkRuntimeException]()
+
artifacts.foreach { artifact =>
- artifact.storage match {
- case d: Artifact.LocalFile =>
- addArtifact(
- artifact.path,
- d.path,
- fragment = None,
- deleteStagedFile = false)
- case d: Artifact.InMemory =>
- val tempDir = Utils.createTempDir().toPath
- val tempFile = tempDir.resolve(artifact.path.getFileName)
- val outStream = Files.newOutputStream(tempFile)
- Utils.tryWithSafeFinallyAndFailureCallbacks {
- d.stream.transferTo(outStream)
- addArtifact(artifact.path, tempFile, fragment = None)
- }(finallyBlock = {
- outStream.close()
- })
- case _ =>
- throw SparkException.internalError(s"Unsupported artifact storage:
${artifact.storage}")
+ try {
+ artifact.storage match {
+ case d: Artifact.LocalFile =>
+ addArtifact(
+ artifact.path,
+ d.path,
+ fragment = None,
+ deleteStagedFile = false)
+ case d: Artifact.InMemory =>
+ val tempDir = Utils.createTempDir().toPath
+ val tempFile = tempDir.resolve(artifact.path.getFileName)
+ val outStream = Files.newOutputStream(tempFile)
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ d.stream.transferTo(outStream)
+ addArtifact(artifact.path, tempFile, fragment = None)
+ }(finallyBlock = {
+ outStream.close()
+ })
+ case _ =>
+ throw SparkException.internalError(s"Unsupported artifact storage:
${artifact.storage}")
+ }
+ } catch {
+ case e: SparkRuntimeException =>
+ failedArtifactExceptions += e
}
}
+
+ if (failedArtifactExceptions.nonEmpty) {
Review Comment:
[ArtifactUtils](https://github.com/apache/spark/blob/f0a3a2ea161181c424d083af76607c6384b2426e/sql/api/src/main/scala/org/apache/spark/sql/util/ArtifactUtils.scala#L4)
would be a good place for this utility method
##########
sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala:
##########
@@ -266,28 +267,41 @@ class ArtifactManager(session: SparkSession) extends
AutoCloseable with Logging
* they are from a permanent location.
*/
private[sql] def addLocalArtifacts(artifacts: Seq[Artifact]): Unit = {
+ val failedArtifactExceptions = ListBuffer[SparkRuntimeException]()
+
artifacts.foreach { artifact =>
- artifact.storage match {
- case d: Artifact.LocalFile =>
- addArtifact(
- artifact.path,
- d.path,
- fragment = None,
- deleteStagedFile = false)
- case d: Artifact.InMemory =>
- val tempDir = Utils.createTempDir().toPath
- val tempFile = tempDir.resolve(artifact.path.getFileName)
- val outStream = Files.newOutputStream(tempFile)
- Utils.tryWithSafeFinallyAndFailureCallbacks {
- d.stream.transferTo(outStream)
- addArtifact(artifact.path, tempFile, fragment = None)
- }(finallyBlock = {
- outStream.close()
- })
- case _ =>
- throw SparkException.internalError(s"Unsupported artifact storage:
${artifact.storage}")
+ try {
+ artifact.storage match {
+ case d: Artifact.LocalFile =>
+ addArtifact(
+ artifact.path,
+ d.path,
+ fragment = None,
+ deleteStagedFile = false)
+ case d: Artifact.InMemory =>
+ val tempDir = Utils.createTempDir().toPath
+ val tempFile = tempDir.resolve(artifact.path.getFileName)
+ val outStream = Files.newOutputStream(tempFile)
+ Utils.tryWithSafeFinallyAndFailureCallbacks {
+ d.stream.transferTo(outStream)
+ addArtifact(artifact.path, tempFile, fragment = None)
+ }(finallyBlock = {
+ outStream.close()
+ })
+ case _ =>
+ throw SparkException.internalError(s"Unsupported artifact storage:
${artifact.storage}")
+ }
+ } catch {
+ case e: SparkRuntimeException =>
Review Comment:
Right, it makes sense to keep it consistent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]