This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d25ef73d3543 [SPARK-46400][CORE][SQL][3.4] When there are corrupted 
files in the local maven repo, skip this cache and try again
d25ef73d3543 is described below

commit d25ef73d3543d7cc7c3d2089e215309ce0bd22d2
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Thu Feb 15 09:52:53 2024 -0800

    [SPARK-46400][CORE][SQL][3.4] When there are corrupted files in the local 
maven repo, skip this cache and try again
    
    ### What changes were proposed in this pull request?
    The pr aims to
    - fix potential bug(ie: https://github.com/apache/spark/pull/44208) and 
enhance user experience.
    - make the code more compliant with standards
    
    Backport above to branch 3.4.
    Master branch pr: https://github.com/apache/spark/pull/44343
    
    ### Why are the changes needed?
    We use the local maven repo as the first-level cache in ivy.  The original 
intention was to reduce the time required to parse and obtain the ar, but when 
there are corrupted files in the local maven repo,The above mechanism will be 
directly interrupted and the prompt is very unfriendly, which will greatly 
confuse the user.  Based on the original intention, we should skip the cache 
directly in similar situations.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manually test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45018 from panbingkun/branch-3.4_SPARK-46400.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 116 +++++++++++++++++----
 .../sql/hive/client/IsolatedClientLoader.scala     |   4 +
 2 files changed, 98 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8b4ef1dee8ac..18d85599bd2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,7 +41,7 @@ import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
 import org.apache.ivy.core.module.descriptor._
 import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.report.{DownloadStatus, ResolveReport}
 import org.apache.ivy.core.resolve.ResolveOptions
 import org.apache.ivy.core.retrieve.RetrieveOptions
 import org.apache.ivy.core.settings.IvySettings
@@ -1217,7 +1217,7 @@ private[spark] object SparkSubmitUtils extends Logging {
         s"be whitespace. The artifactId provided is: ${splits(1)}")
       require(splits(2) != null && splits(2).trim.nonEmpty, s"The version 
cannot be null or " +
         s"be whitespace. The version provided is: ${splits(2)}")
-      new MavenCoordinate(splits(0), splits(1), splits(2))
+      MavenCoordinate(splits(0), splits(1), splits(2))
     }
   }
 
@@ -1232,21 +1232,27 @@ private[spark] object SparkSubmitUtils extends Logging {
   }
 
   /**
-   * Extracts maven coordinates from a comma-delimited string
+   * Create a ChainResolver used by Ivy to search for and resolve dependencies.
+   *
    * @param defaultIvyUserDir The default user path for Ivy
+   * @param useLocalM2AsCache Whether to use the local maven repo as a cache
    * @return A ChainResolver used by Ivy to search for and resolve 
dependencies.
    */
-  def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
+  def createRepoResolvers(
+      defaultIvyUserDir: File,
+      useLocalM2AsCache: Boolean = true): ChainResolver = {
     // We need a chain resolver if we want to check multiple repositories
     val cr = new ChainResolver
     cr.setName("spark-list")
 
-    val localM2 = new IBiblioResolver
-    localM2.setM2compatible(true)
-    localM2.setRoot(m2Path.toURI.toString)
-    localM2.setUsepoms(true)
-    localM2.setName("local-m2-cache")
-    cr.add(localM2)
+    if (useLocalM2AsCache) {
+      val localM2 = new IBiblioResolver
+      localM2.setM2compatible(true)
+      localM2.setRoot(m2Path.toURI.toString)
+      localM2.setUsepoms(true)
+      localM2.setName("local-m2-cache")
+      cr.add(localM2)
+    }
 
     val localIvy = new FileSystemResolver
     val localIvyRoot = new File(defaultIvyUserDir, "local")
@@ -1342,18 +1348,23 @@ private[spark] object SparkSubmitUtils extends Logging {
 
   /**
    * Build Ivy Settings using options with default resolvers
+   *
    * @param remoteRepos Comma-delimited string of remote repositories other 
than maven central
    * @param ivyPath The path to the local ivy repository
+   * @param useLocalM2AsCache Whether or not use `local-m2 repo` as cache
    * @return An IvySettings object
    */
-  def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): 
IvySettings = {
+  def buildIvySettings(
+      remoteRepos: Option[String],
+      ivyPath: Option[String],
+      useLocalM2AsCache: Boolean = true): IvySettings = {
     val ivySettings: IvySettings = new IvySettings
     processIvyPathArg(ivySettings, ivyPath)
 
     // create a pattern matcher
     ivySettings.addMatcher(new GlobPatternMatcher)
     // create the dependency resolvers
-    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, 
useLocalM2AsCache)
     ivySettings.addResolver(repoResolver)
     ivySettings.setDefaultResolver(repoResolver.getName)
     processRemoteRepoArg(ivySettings, remoteRepos)
@@ -1450,7 +1461,7 @@ private[spark] object SparkSubmitUtils extends Logging {
    */
   private def clearIvyResolutionFiles(
       mdId: ModuleRevisionId,
-      ivySettings: IvySettings,
+      defaultCacheFile: File,
       ivyConfName: String): Unit = {
     val currentResolutionFiles = Seq(
       s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
@@ -1458,14 +1469,40 @@ private[spark] object SparkSubmitUtils extends Logging {
       
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
     )
     currentResolutionFiles.foreach { filename =>
-      new File(ivySettings.getDefaultCache, filename).delete()
+      new File(defaultCacheFile, filename).delete()
+    }
+  }
+
+  /**
+   * Clear invalid cache files in ivy. The cache file is usually at
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml,
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties.
+   * Because when using `local-m2` repo as a cache, some invalid files were 
created.
+   * If not deleted here, an error prompt similar to `unknown resolver 
local-m2-cache`
+   * will be generated, making some confusion for users.
+   */
+  private def clearInvalidIvyCacheFiles(
+      mdId: ModuleRevisionId,
+      defaultCacheFile: File): Unit = {
+    val cacheFiles = Seq(
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml",
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml.original",
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivydata-${mdId.getRevision}.properties")
+    cacheFiles.foreach { filename =>
+      new File(defaultCacheFile, filename).delete()
     }
   }
 
   /**
    * Resolves any dependencies that were supplied through maven coordinates
+   *
    * @param coordinates Comma-delimited string of maven coordinates
    * @param ivySettings An IvySettings containing resolvers to use
+   * @param noCacheIvySettings An no-cache(local-m2-cache) IvySettings 
containing resolvers to use
    * @param transitive Whether resolving transitive dependencies, default is 
true
    * @param exclusions Exclusions to apply when resolving transitive 
dependencies
    * @return Seq of path to the jars of the given maven artifacts including 
their
@@ -1474,6 +1511,7 @@ private[spark] object SparkSubmitUtils extends Logging {
   def resolveMavenCoordinates(
       coordinates: String,
       ivySettings: IvySettings,
+      noCacheIvySettings: Option[IvySettings] = None,
       transitive: Boolean,
       exclusions: Seq[String] = Nil,
       isTest: Boolean = false): Seq[String] = {
@@ -1502,6 +1540,8 @@ private[spark] object SparkSubmitUtils extends Logging {
         // scalastyle:on println
 
         val ivy = Ivy.newInstance(ivySettings)
+        ivy.pushContext()
+
         // Set resolve options to download transitive dependencies as well
         val resolveOptions = new ResolveOptions
         resolveOptions.setTransitive(transitive)
@@ -1514,6 +1554,11 @@ private[spark] object SparkSubmitUtils extends Logging {
         } else {
           resolveOptions.setDownload(true)
         }
+        // retrieve all resolved dependencies
+        retrieveOptions.setDestArtifactPattern(
+          packagesDirectory.getAbsolutePath + File.separator +
+            "[organization]_[artifact]-[revision](-[classifier]).[ext]")
+        retrieveOptions.setConfs(Array(ivyConfName))
 
         // Add exclusion rules for Spark and Scala Library
         addExclusionRules(ivySettings, ivyConfName, md)
@@ -1525,17 +1570,44 @@ private[spark] object SparkSubmitUtils extends Logging {
         // resolve dependencies
         val rr: ResolveReport = ivy.resolve(md, resolveOptions)
         if (rr.hasError) {
-          throw new RuntimeException(rr.getAllProblemMessages.toString)
+          // SPARK-46302: When there are some corrupted jars in the local 
maven repo,
+          // we try to continue without the cache
+          val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, 
true)
+          if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
+            val failedArtifacts = failedReports.map(r => r.getArtifact)
+            logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", 
"]")}, " +
+              s"attempt to retry while skipping local-m2-cache.")
+            failedArtifacts.foreach(artifact => {
+              clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, 
ivySettings.getDefaultCache)
+            })
+            ivy.popContext()
+
+            val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get)
+            noCacheIvy.pushContext()
+
+            val noCacheRr = noCacheIvy.resolve(md, resolveOptions)
+            if (noCacheRr.hasError) {
+              throw new 
RuntimeException(noCacheRr.getAllProblemMessages.toString)
+            }
+            
noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, 
retrieveOptions)
+            val dependencyPaths = resolveDependencyPaths(
+              noCacheRr.getArtifacts.toArray, packagesDirectory)
+            noCacheIvy.popContext()
+
+            dependencyPaths
+          } else {
+            throw new RuntimeException(rr.getAllProblemMessages.toString)
+          }
+        } else {
+          ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, 
retrieveOptions)
+          val dependencyPaths = 
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+          ivy.popContext()
+
+          dependencyPaths
         }
-        // retrieve all resolved dependencies
-        
retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + 
File.separator +
-          "[organization]_[artifact]-[revision](-[classifier]).[ext]")
-        ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
-          retrieveOptions.setConfs(Array(ivyConfName)))
-        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
       } finally {
         System.setOut(sysOut)
-        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, 
ivyConfName)
+        clearIvyResolutionFiles(md.getModuleRevisionId, 
ivySettings.getDefaultCache, ivyConfName)
       }
     }
   }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e65e6d42937c..dde3be5ad0c4 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -145,6 +145,10 @@ private[hive] object IsolatedClientLoader extends Logging {
         SparkSubmitUtils.buildIvySettings(
           Some(remoteRepos),
           ivyPath),
+        Some(SparkSubmitUtils.buildIvySettings(
+          Some(remoteRepos),
+          ivyPath,
+          useLocalM2AsCache = false)),
         transitive = true,
         exclusions = version.exclusions)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to