This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f2a471e9cc75 [SPARK-46400][CORE][SQL] When there are corrupted files 
in the local maven repo, skip this cache and try again
f2a471e9cc75 is described below

commit f2a471e9cc752f3826232eedc9025fd156a85965
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Wed Jan 31 09:46:07 2024 -0600

    [SPARK-46400][CORE][SQL] When there are corrupted files in the local maven 
repo, skip this cache and try again
    
    ### What changes were proposed in this pull request?
    The pr aims to
    - fix potential bug(ie: https://github.com/apache/spark/pull/44208) and 
enhance user experience.
    - make the code more compliant with standards
    
    ### Why are the changes needed?
    We use the local maven repo as the first-level cache in ivy.  The original 
intention was to reduce the time required to parse and obtain the ar, but when 
there are corrupted files in the local maven repo,The above mechanism will be 
directly interrupted and the prompt is very unfriendly, which will greatly 
confuse the user.  Based on the original intention, we should skip the cache 
directly in similar situations.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manually test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44343 from panbingkun/SPARK-46400.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../scala/org/apache/spark/util/MavenUtils.scala   | 147 +++++++++++++++------
 .../sql/hive/client/IsolatedClientLoader.scala     |   4 +
 2 files changed, 112 insertions(+), 39 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
index 2d7fba6f07d5..65530b7fa473 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
@@ -27,7 +27,7 @@ import org.apache.ivy.Ivy
 import org.apache.ivy.core.LogOptions
 import org.apache.ivy.core.module.descriptor.{Artifact, 
DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, 
ExcludeRule}
 import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.report.{DownloadStatus, ResolveReport}
 import org.apache.ivy.core.resolve.ResolveOptions
 import org.apache.ivy.core.retrieve.RetrieveOptions
 import org.apache.ivy.core.settings.IvySettings
@@ -43,8 +43,8 @@ import org.apache.spark.util.ArrayImplicits._
 private[spark] object MavenUtils extends Logging {
   val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings"
 
-//  // Exposed for testing
-//  var printStream = SparkSubmit.printStream
+  // Exposed for testing
+  // var printStream = SparkSubmit.printStream
 
   // Exposed for testing.
   // These components are used to make the default exclusion rules for Spark 
dependencies.
@@ -113,7 +113,7 @@ private[spark] object MavenUtils extends Logging {
         splits(2) != null && splits(2).trim.nonEmpty,
         s"The version cannot be null or " +
           s"be whitespace. The version provided is: ${splits(2)}")
-      new MavenCoordinate(splits(0), splits(1), splits(2))
+      MavenCoordinate(splits(0), splits(1), splits(2))
     }.toImmutableArraySeq
   }
 
@@ -128,24 +128,30 @@ private[spark] object MavenUtils extends Logging {
   }
 
   /**
-   * Extracts maven coordinates from a comma-delimited string
+   * Create a ChainResolver used by Ivy to search for and resolve dependencies.
    *
    * @param defaultIvyUserDir
    *   The default user path for Ivy
+   * @param useLocalM2AsCache
+   *   Whether to use the local maven repo as a cache
    * @return
    *   A ChainResolver used by Ivy to search for and resolve dependencies.
    */
-  private[util] def createRepoResolvers(defaultIvyUserDir: File): 
ChainResolver = {
+  private[util] def createRepoResolvers(
+      defaultIvyUserDir: File,
+      useLocalM2AsCache: Boolean = true): ChainResolver = {
     // We need a chain resolver if we want to check multiple repositories
     val cr = new ChainResolver
     cr.setName("spark-list")
 
-    val localM2 = new IBiblioResolver
-    localM2.setM2compatible(true)
-    localM2.setRoot(m2Path.toURI.toString)
-    localM2.setUsepoms(true)
-    localM2.setName("local-m2-cache")
-    cr.add(localM2)
+    if (useLocalM2AsCache) {
+      val localM2 = new IBiblioResolver
+      localM2.setM2compatible(true)
+      localM2.setRoot(m2Path.toURI.toString)
+      localM2.setUsepoms(true)
+      localM2.setName("local-m2-cache")
+      cr.add(localM2)
+    }
 
     val localIvy = new FileSystemResolver
     val localIvyRoot = new File(defaultIvyUserDir, "local")
@@ -263,18 +269,22 @@ private[spark] object MavenUtils extends Logging {
    *   Comma-delimited string of remote repositories other than maven central
    * @param ivyPath
    *   The path to the local ivy repository
+   * @param useLocalM2AsCache
+   *   Whether or not use `local-m2 repo` as cache
    * @return
    *   An IvySettings object
    */
-  def buildIvySettings(remoteRepos: Option[String], ivyPath: 
Option[String])(implicit
-      printStream: PrintStream): IvySettings = {
+  def buildIvySettings(
+      remoteRepos: Option[String],
+      ivyPath: Option[String],
+      useLocalM2AsCache: Boolean = true)(implicit printStream: PrintStream): 
IvySettings = {
     val ivySettings: IvySettings = new IvySettings
     processIvyPathArg(ivySettings, ivyPath)
 
     // create a pattern matcher
     ivySettings.addMatcher(new GlobPatternMatcher)
     // create the dependency resolvers
-    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, 
useLocalM2AsCache)
     ivySettings.addResolver(repoResolver)
     ivySettings.setDefaultResolver(repoResolver.getName)
     processRemoteRepoArg(ivySettings, remoteRepos)
@@ -310,7 +320,7 @@ private[spark] object MavenUtils extends Logging {
             JAR_IVY_SETTING_PATH_KEY)
     }
     require(file.exists(), s"Ivy settings file $file does not exist")
-    require(file.isFile(), s"Ivy settings file $file is not a normal file")
+    require(file.isFile, s"Ivy settings file $file is not a normal file")
     val ivySettings: IvySettings = new IvySettings
     try {
       ivySettings.load(file)
@@ -351,7 +361,7 @@ private[spark] object MavenUtils extends Logging {
         cr.add(brr)
         // scalastyle:off println
         printStream.println(s"$repo added as a remote repository with the 
name: ${brr.getName}")
-      // scalastyle:on println
+        // scalastyle:on println
       }
 
       ivySettings.addResolver(cr)
@@ -376,14 +386,38 @@ private[spark] object MavenUtils extends Logging {
    */
   private def clearIvyResolutionFiles(
       mdId: ModuleRevisionId,
-      ivySettings: IvySettings,
+      defaultCacheFile: File,
       ivyConfName: String): Unit = {
     val currentResolutionFiles = Seq(
       s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
       
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
       
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties")
     currentResolutionFiles.foreach { filename =>
-      new File(ivySettings.getDefaultCache, filename).delete()
+      new File(defaultCacheFile, filename).delete()
+    }
+  }
+
+  /**
+   * Clear invalid cache files in ivy. The cache file is usually at
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml,
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and
+   * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties.
+   * Because when using `local-m2` repo as a cache, some invalid files were 
created.
+   * If not deleted here, an error prompt similar to `unknown resolver 
local-m2-cache`
+   * will be generated, making some confusion for users.
+   */
+  private def clearInvalidIvyCacheFiles(
+      mdId: ModuleRevisionId,
+      defaultCacheFile: File): Unit = {
+    val cacheFiles = Seq(
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml",
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivy-${mdId.getRevision}.xml.original",
+      
s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" +
+        s"ivydata-${mdId.getRevision}.properties")
+    cacheFiles.foreach { filename =>
+      new File(defaultCacheFile, filename).delete()
     }
   }
 
@@ -394,6 +428,8 @@ private[spark] object MavenUtils extends Logging {
    *   Comma-delimited string of maven coordinates
    * @param ivySettings
    *   An IvySettings containing resolvers to use
+   * @param noCacheIvySettings
+   *   An no-cache(local-m2-cache) IvySettings containing resolvers to use
    * @param transitive
    *   Whether resolving transitive dependencies, default is true
    * @param exclusions
@@ -405,6 +441,7 @@ private[spark] object MavenUtils extends Logging {
   def resolveMavenCoordinates(
       coordinates: String,
       ivySettings: IvySettings,
+      noCacheIvySettings: Option[IvySettings] = None,
       transitive: Boolean,
       exclusions: Seq[String] = Nil,
       isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] 
= {
@@ -433,6 +470,8 @@ private[spark] object MavenUtils extends Logging {
         // scalastyle:on println
 
         val ivy = Ivy.newInstance(ivySettings)
+        ivy.pushContext()
+
         // Set resolve options to download transitive dependencies as well
         val resolveOptions = new ResolveOptions
         resolveOptions.setTransitive(transitive)
@@ -445,6 +484,11 @@ private[spark] object MavenUtils extends Logging {
         } else {
           resolveOptions.setDownload(true)
         }
+        // retrieve all resolved dependencies
+        retrieveOptions.setDestArtifactPattern(
+          packagesDirectory.getAbsolutePath + File.separator +
+            "[organization]_[artifact]-[revision](-[classifier]).[ext]")
+        retrieveOptions.setConfs(Array(ivyConfName))
 
         // Add exclusion rules for Spark and Scala Library
         addExclusionRules(ivySettings, ivyConfName, md)
@@ -456,19 +500,44 @@ private[spark] object MavenUtils extends Logging {
         // resolve dependencies
         val rr: ResolveReport = ivy.resolve(md, resolveOptions)
         if (rr.hasError) {
-          throw new RuntimeException(rr.getAllProblemMessages.toString)
+          // SPARK-46302: When there are some corrupted jars in the local 
maven repo,
+          // we try to continue without the cache
+          val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, 
true)
+          if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
+            val failedArtifacts = failedReports.map(r => r.getArtifact)
+            logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", 
"]")}, " +
+              s"attempt to retry while skipping local-m2-cache.")
+            failedArtifacts.foreach(artifact => {
+              clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, 
ivySettings.getDefaultCache)
+            })
+            ivy.popContext()
+
+            val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get)
+            noCacheIvy.pushContext()
+
+            val noCacheRr = noCacheIvy.resolve(md, resolveOptions)
+            if (noCacheRr.hasError) {
+              throw new 
RuntimeException(noCacheRr.getAllProblemMessages.toString)
+            }
+            
noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, 
retrieveOptions)
+            val dependencyPaths = resolveDependencyPaths(
+              noCacheRr.getArtifacts.toArray, packagesDirectory)
+            noCacheIvy.popContext()
+
+            dependencyPaths
+          } else {
+            throw new RuntimeException(rr.getAllProblemMessages.toString)
+          }
+        } else {
+          ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, 
retrieveOptions)
+          val dependencyPaths = 
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+          ivy.popContext()
+
+          dependencyPaths
         }
-        // retrieve all resolved dependencies
-        retrieveOptions.setDestArtifactPattern(
-          packagesDirectory.getAbsolutePath + File.separator +
-            "[organization]_[artifact]-[revision](-[classifier]).[ext]")
-        ivy.retrieve(
-          rr.getModuleDescriptor.getModuleRevisionId,
-          retrieveOptions.setConfs(Array(ivyConfName)))
-        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
       } finally {
         System.setOut(sysOut)
-        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, 
ivyConfName)
+        clearIvyResolutionFiles(md.getModuleRevisionId, 
ivySettings.getDefaultCache, ivyConfName)
       }
     }
   }
@@ -477,37 +546,37 @@ private[spark] object MavenUtils extends Logging {
       coords: String,
       ivySettings: IvySettings,
       ivyConfName: String): ExcludeRule = {
-    val c = extractMavenCoordinates(coords)(0)
+    val c = extractMavenCoordinates(coords).head
     val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", 
"*")
     val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
     rule.addConfiguration(ivyConfName)
     rule
   }
 
-  def isInvalidQueryString(tokens: Array[String]): Boolean = {
+  private def isInvalidQueryString(tokens: Array[String]): Boolean = {
     tokens.length != 2 || StringUtils.isBlank(tokens(0)) || 
StringUtils.isBlank(tokens(1))
   }
 
   /**
-   * Parse URI query string's parameter value of `transitive` and `exclude`. 
Other invalid
-   * parameters will be ignored.
+   * Parse URI query string's parameter value of `transitive`, `exclude` and 
`repos`.
+   * Other invalid parameters will be ignored.
    *
    * @param uri
    *   Ivy URI need to be downloaded.
    * @return
-   *   Tuple value of parameter `transitive` and `exclude` value.
+   *   Tuple value of parameter `transitive`, `exclude` and `repos` value.
    *
    *   1. transitive: whether to download dependency jar of Ivy URI, default 
value is true and
    *      this parameter value is case-insensitive. This mimics Hive's 
behaviour for parsing the
    *      transitive parameter. Invalid value will be treat as false. Example: 
Input:
    *      exclude=org.mortbay.jetty:jetty&transitive=true Output: true
    *
-   * 2. exclude: comma separated exclusions to apply when resolving transitive 
dependencies,
-   * consists of `group:module` pairs separated by commas. Example: Input:
-   * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output:
-   * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   *    2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
+   *      consists of `group:module` pairs separated by commas. Example: Input:
+   *      excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output:
+   *      [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
    *
-   * 3. repos: comma separated repositories to use when resolving dependencies.
+   *    3. repos: comma separated repositories to use when resolving 
dependencies.
    */
   def parseQueryParams(uri: URI): (Boolean, String, String) = {
     val uriQuery = uri.getQuery
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index af092fa3d329..b36df2f60103 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -133,6 +133,10 @@ private[hive] object IsolatedClientLoader extends Logging {
         MavenUtils.buildIvySettings(
           Some(remoteRepos),
           ivyPath),
+        Some(MavenUtils.buildIvySettings(
+          Some(remoteRepos),
+          ivyPath,
+          useLocalM2AsCache = false)),
         transitive = true,
         exclusions = version.exclusions)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to