This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f2a471e9cc75 [SPARK-46400][CORE][SQL] When there are corrupted files in the local maven repo, skip this cache and try again f2a471e9cc75 is described below commit f2a471e9cc752f3826232eedc9025fd156a85965 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Wed Jan 31 09:46:07 2024 -0600 [SPARK-46400][CORE][SQL] When there are corrupted files in the local maven repo, skip this cache and try again ### What changes were proposed in this pull request? The pr aims to - fix potential bug(ie: https://github.com/apache/spark/pull/44208) and enhance user experience. - make the code more compliant with standards ### Why are the changes needed? We use the local maven repo as the first-level cache in ivy. The original intention was to reduce the time required to parse and obtain the ar, but when there are corrupted files in the local maven repo,The above mechanism will be directly interrupted and the prompt is very unfriendly, which will greatly confuse the user. Based on the original intention, we should skip the cache directly in similar situations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44343 from panbingkun/SPARK-46400. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../scala/org/apache/spark/util/MavenUtils.scala | 147 +++++++++++++++------ .../sql/hive/client/IsolatedClientLoader.scala | 4 + 2 files changed, 112 insertions(+), 39 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index 2d7fba6f07d5..65530b7fa473 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -27,7 +27,7 @@ import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor.{Artifact, DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, ExcludeRule} import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.report.{DownloadStatus, ResolveReport} import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings @@ -43,8 +43,8 @@ import org.apache.spark.util.ArrayImplicits._ private[spark] object MavenUtils extends Logging { val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings" -// // Exposed for testing -// var printStream = SparkSubmit.printStream + // Exposed for testing + // var printStream = SparkSubmit.printStream // Exposed for testing. // These components are used to make the default exclusion rules for Spark dependencies. @@ -113,7 +113,7 @@ private[spark] object MavenUtils extends Logging { splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) + MavenCoordinate(splits(0), splits(1), splits(2)) }.toImmutableArraySeq } @@ -128,24 +128,30 @@ private[spark] object MavenUtils extends Logging { } /** - * Extracts maven coordinates from a comma-delimited string + * Create a ChainResolver used by Ivy to search for and resolve dependencies. * * @param defaultIvyUserDir * The default user path for Ivy + * @param useLocalM2AsCache + * Whether to use the local maven repo as a cache * @return * A ChainResolver used by Ivy to search for and resolve dependencies. */ - private[util] def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { + private[util] def createRepoResolvers( + defaultIvyUserDir: File, + useLocalM2AsCache: Boolean = true): ChainResolver = { // We need a chain resolver if we want to check multiple repositories val cr = new ChainResolver cr.setName("spark-list") - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) + if (useLocalM2AsCache) { + val localM2 = new IBiblioResolver + localM2.setM2compatible(true) + localM2.setRoot(m2Path.toURI.toString) + localM2.setUsepoms(true) + localM2.setName("local-m2-cache") + cr.add(localM2) + } val localIvy = new FileSystemResolver val localIvyRoot = new File(defaultIvyUserDir, "local") @@ -263,18 +269,22 @@ private[spark] object MavenUtils extends Logging { * Comma-delimited string of remote repositories other than maven central * @param ivyPath * The path to the local ivy repository + * @param useLocalM2AsCache + * Whether or not use `local-m2 repo` as cache * @return * An IvySettings object */ - def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String])(implicit - printStream: PrintStream): IvySettings = { + def buildIvySettings( + remoteRepos: Option[String], + ivyPath: Option[String], + useLocalM2AsCache: Boolean = true)(implicit printStream: PrintStream): IvySettings = { val ivySettings: IvySettings = new IvySettings processIvyPathArg(ivySettings, ivyPath) // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) // create the dependency resolvers - val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) + val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache) ivySettings.addResolver(repoResolver) ivySettings.setDefaultResolver(repoResolver.getName) processRemoteRepoArg(ivySettings, remoteRepos) @@ -310,7 +320,7 @@ private[spark] object MavenUtils extends Logging { JAR_IVY_SETTING_PATH_KEY) } require(file.exists(), s"Ivy settings file $file does not exist") - require(file.isFile(), s"Ivy settings file $file is not a normal file") + require(file.isFile, s"Ivy settings file $file is not a normal file") val ivySettings: IvySettings = new IvySettings try { ivySettings.load(file) @@ -351,7 +361,7 @@ private[spark] object MavenUtils extends Logging { cr.add(brr) // scalastyle:off println printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}") - // scalastyle:on println + // scalastyle:on println } ivySettings.addResolver(cr) @@ -376,14 +386,38 @@ private[spark] object MavenUtils extends Logging { */ private def clearIvyResolutionFiles( mdId: ModuleRevisionId, - ivySettings: IvySettings, + defaultCacheFile: File, ivyConfName: String): Unit = { val currentResolutionFiles = Seq( s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml", s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties") currentResolutionFiles.foreach { filename => - new File(ivySettings.getDefaultCache, filename).delete() + new File(defaultCacheFile, filename).delete() + } + } + + /** + * Clear invalid cache files in ivy. The cache file is usually at + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml, + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and + * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties. + * Because when using `local-m2` repo as a cache, some invalid files were created. + * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache` + * will be generated, making some confusion for users. + */ + private def clearInvalidIvyCacheFiles( + mdId: ModuleRevisionId, + defaultCacheFile: File): Unit = { + val cacheFiles = Seq( + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml.original", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivydata-${mdId.getRevision}.properties") + cacheFiles.foreach { filename => + new File(defaultCacheFile, filename).delete() } } @@ -394,6 +428,8 @@ private[spark] object MavenUtils extends Logging { * Comma-delimited string of maven coordinates * @param ivySettings * An IvySettings containing resolvers to use + * @param noCacheIvySettings + * An no-cache(local-m2-cache) IvySettings containing resolvers to use * @param transitive * Whether resolving transitive dependencies, default is true * @param exclusions @@ -405,6 +441,7 @@ private[spark] object MavenUtils extends Logging { def resolveMavenCoordinates( coordinates: String, ivySettings: IvySettings, + noCacheIvySettings: Option[IvySettings] = None, transitive: Boolean, exclusions: Seq[String] = Nil, isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] = { @@ -433,6 +470,8 @@ private[spark] object MavenUtils extends Logging { // scalastyle:on println val ivy = Ivy.newInstance(ivySettings) + ivy.pushContext() + // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions resolveOptions.setTransitive(transitive) @@ -445,6 +484,11 @@ private[spark] object MavenUtils extends Logging { } else { resolveOptions.setDownload(true) } + // retrieve all resolved dependencies + retrieveOptions.setDestArtifactPattern( + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision](-[classifier]).[ext]") + retrieveOptions.setConfs(Array(ivyConfName)) // Add exclusion rules for Spark and Scala Library addExclusionRules(ivySettings, ivyConfName, md) @@ -456,19 +500,44 @@ private[spark] object MavenUtils extends Logging { // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) + // SPARK-46302: When there are some corrupted jars in the local maven repo, + // we try to continue without the cache + val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) + if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { + val failedArtifacts = failedReports.map(r => r.getArtifact) + logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " + + s"attempt to retry while skipping local-m2-cache.") + failedArtifacts.foreach(artifact => { + clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache) + }) + ivy.popContext() + + val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get) + noCacheIvy.pushContext() + + val noCacheRr = noCacheIvy.resolve(md, resolveOptions) + if (noCacheRr.hasError) { + throw new RuntimeException(noCacheRr.getAllProblemMessages.toString) + } + noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths( + noCacheRr.getArtifacts.toArray, packagesDirectory) + noCacheIvy.popContext() + + dependencyPaths + } else { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + } else { + ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + ivy.popContext() + + dependencyPaths } - // retrieve all resolved dependencies - retrieveOptions.setDestArtifactPattern( - packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision](-[classifier]).[ext]") - ivy.retrieve( - rr.getModuleDescriptor.getModuleRevisionId, - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } finally { System.setOut(sysOut) - clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) + clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName) } } } @@ -477,37 +546,37 @@ private[spark] object MavenUtils extends Logging { coords: String, ivySettings: IvySettings, ivyConfName: String): ExcludeRule = { - val c = extractMavenCoordinates(coords)(0) + val c = extractMavenCoordinates(coords).head val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*") val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null) rule.addConfiguration(ivyConfName) rule } - def isInvalidQueryString(tokens: Array[String]): Boolean = { + private def isInvalidQueryString(tokens: Array[String]): Boolean = { tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1)) } /** - * Parse URI query string's parameter value of `transitive` and `exclude`. Other invalid - * parameters will be ignored. + * Parse URI query string's parameter value of `transitive`, `exclude` and `repos`. + * Other invalid parameters will be ignored. * * @param uri * Ivy URI need to be downloaded. * @return - * Tuple value of parameter `transitive` and `exclude` value. + * Tuple value of parameter `transitive`, `exclude` and `repos` value. * * 1. transitive: whether to download dependency jar of Ivy URI, default value is true and * this parameter value is case-insensitive. This mimics Hive's behaviour for parsing the * transitive parameter. Invalid value will be treat as false. Example: Input: * exclude=org.mortbay.jetty:jetty&transitive=true Output: true * - * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, - * consists of `group:module` pairs separated by commas. Example: Input: - * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: - * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] + * 2. exclude: comma separated exclusions to apply when resolving transitive dependencies, + * consists of `group:module` pairs separated by commas. Example: Input: + * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output: + * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] * - * 3. repos: comma separated repositories to use when resolving dependencies. + * 3. repos: comma separated repositories to use when resolving dependencies. */ def parseQueryParams(uri: URI): (Boolean, String, String) = { val uriQuery = uri.getQuery diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index af092fa3d329..b36df2f60103 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -133,6 +133,10 @@ private[hive] object IsolatedClientLoader extends Logging { MavenUtils.buildIvySettings( Some(remoteRepos), ivyPath), + Some(MavenUtils.buildIvySettings( + Some(remoteRepos), + ivyPath, + useLocalM2AsCache = false)), transitive = true, exclusions = version.exclusions) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org