This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new d25ef73d3543 [SPARK-46400][CORE][SQL][3.4] When there are corrupted files in the local maven repo, skip this cache and try again d25ef73d3543 is described below commit d25ef73d3543d7cc7c3d2089e215309ce0bd22d2 Author: panbingkun <panbing...@baidu.com> AuthorDate: Thu Feb 15 09:52:53 2024 -0800 [SPARK-46400][CORE][SQL][3.4] When there are corrupted files in the local maven repo, skip this cache and try again ### What changes were proposed in this pull request? The pr aims to - fix potential bug(ie: https://github.com/apache/spark/pull/44208) and enhance user experience. - make the code more compliant with standards Backport above to branch 3.4. Master branch pr: https://github.com/apache/spark/pull/44343 ### Why are the changes needed? We use the local maven repo as the first-level cache in ivy. The original intention was to reduce the time required to parse and obtain the ar, but when there are corrupted files in the local maven repo,The above mechanism will be directly interrupted and the prompt is very unfriendly, which will greatly confuse the user. Based on the original intention, we should skip the cache directly in similar situations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45018 from panbingkun/branch-3.4_SPARK-46400. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/deploy/SparkSubmit.scala | 116 +++++++++++++++++---- .../sql/hive/client/IsolatedClientLoader.scala | 4 + 2 files changed, 98 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8b4ef1dee8ac..18d85599bd2d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,7 +41,7 @@ import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.report.ResolveReport +import org.apache.ivy.core.report.{DownloadStatus, ResolveReport} import org.apache.ivy.core.resolve.ResolveOptions import org.apache.ivy.core.retrieve.RetrieveOptions import org.apache.ivy.core.settings.IvySettings @@ -1217,7 +1217,7 @@ private[spark] object SparkSubmitUtils extends Logging { s"be whitespace. The artifactId provided is: ${splits(1)}") require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " + s"be whitespace. The version provided is: ${splits(2)}") - new MavenCoordinate(splits(0), splits(1), splits(2)) + MavenCoordinate(splits(0), splits(1), splits(2)) } } @@ -1232,21 +1232,27 @@ private[spark] object SparkSubmitUtils extends Logging { } /** - * Extracts maven coordinates from a comma-delimited string + * Create a ChainResolver used by Ivy to search for and resolve dependencies. + * * @param defaultIvyUserDir The default user path for Ivy + * @param useLocalM2AsCache Whether to use the local maven repo as a cache * @return A ChainResolver used by Ivy to search for and resolve dependencies. */ - def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = { + def createRepoResolvers( + defaultIvyUserDir: File, + useLocalM2AsCache: Boolean = true): ChainResolver = { // We need a chain resolver if we want to check multiple repositories val cr = new ChainResolver cr.setName("spark-list") - val localM2 = new IBiblioResolver - localM2.setM2compatible(true) - localM2.setRoot(m2Path.toURI.toString) - localM2.setUsepoms(true) - localM2.setName("local-m2-cache") - cr.add(localM2) + if (useLocalM2AsCache) { + val localM2 = new IBiblioResolver + localM2.setM2compatible(true) + localM2.setRoot(m2Path.toURI.toString) + localM2.setUsepoms(true) + localM2.setName("local-m2-cache") + cr.add(localM2) + } val localIvy = new FileSystemResolver val localIvyRoot = new File(defaultIvyUserDir, "local") @@ -1342,18 +1348,23 @@ private[spark] object SparkSubmitUtils extends Logging { /** * Build Ivy Settings using options with default resolvers + * * @param remoteRepos Comma-delimited string of remote repositories other than maven central * @param ivyPath The path to the local ivy repository + * @param useLocalM2AsCache Whether or not use `local-m2 repo` as cache * @return An IvySettings object */ - def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = { + def buildIvySettings( + remoteRepos: Option[String], + ivyPath: Option[String], + useLocalM2AsCache: Boolean = true): IvySettings = { val ivySettings: IvySettings = new IvySettings processIvyPathArg(ivySettings, ivyPath) // create a pattern matcher ivySettings.addMatcher(new GlobPatternMatcher) // create the dependency resolvers - val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir) + val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir, useLocalM2AsCache) ivySettings.addResolver(repoResolver) ivySettings.setDefaultResolver(repoResolver.getName) processRemoteRepoArg(ivySettings, remoteRepos) @@ -1450,7 +1461,7 @@ private[spark] object SparkSubmitUtils extends Logging { */ private def clearIvyResolutionFiles( mdId: ModuleRevisionId, - ivySettings: IvySettings, + defaultCacheFile: File, ivyConfName: String): Unit = { val currentResolutionFiles = Seq( s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml", @@ -1458,14 +1469,40 @@ private[spark] object SparkSubmitUtils extends Logging { s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties" ) currentResolutionFiles.foreach { filename => - new File(ivySettings.getDefaultCache, filename).delete() + new File(defaultCacheFile, filename).delete() + } + } + + /** + * Clear invalid cache files in ivy. The cache file is usually at + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml, + * ~/.ivy2/cache/${groupId}/${artifactId}/ivy-${version}.xml.original, and + * ~/.ivy2/cache/${groupId}/${artifactId}/ivydata-${version}.properties. + * Because when using `local-m2` repo as a cache, some invalid files were created. + * If not deleted here, an error prompt similar to `unknown resolver local-m2-cache` + * will be generated, making some confusion for users. + */ + private def clearInvalidIvyCacheFiles( + mdId: ModuleRevisionId, + defaultCacheFile: File): Unit = { + val cacheFiles = Seq( + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivy-${mdId.getRevision}.xml.original", + s"${mdId.getOrganisation}${File.separator}${mdId.getName}${File.separator}" + + s"ivydata-${mdId.getRevision}.properties") + cacheFiles.foreach { filename => + new File(defaultCacheFile, filename).delete() } } /** * Resolves any dependencies that were supplied through maven coordinates + * * @param coordinates Comma-delimited string of maven coordinates * @param ivySettings An IvySettings containing resolvers to use + * @param noCacheIvySettings An no-cache(local-m2-cache) IvySettings containing resolvers to use * @param transitive Whether resolving transitive dependencies, default is true * @param exclusions Exclusions to apply when resolving transitive dependencies * @return Seq of path to the jars of the given maven artifacts including their @@ -1474,6 +1511,7 @@ private[spark] object SparkSubmitUtils extends Logging { def resolveMavenCoordinates( coordinates: String, ivySettings: IvySettings, + noCacheIvySettings: Option[IvySettings] = None, transitive: Boolean, exclusions: Seq[String] = Nil, isTest: Boolean = false): Seq[String] = { @@ -1502,6 +1540,8 @@ private[spark] object SparkSubmitUtils extends Logging { // scalastyle:on println val ivy = Ivy.newInstance(ivySettings) + ivy.pushContext() + // Set resolve options to download transitive dependencies as well val resolveOptions = new ResolveOptions resolveOptions.setTransitive(transitive) @@ -1514,6 +1554,11 @@ private[spark] object SparkSubmitUtils extends Logging { } else { resolveOptions.setDownload(true) } + // retrieve all resolved dependencies + retrieveOptions.setDestArtifactPattern( + packagesDirectory.getAbsolutePath + File.separator + + "[organization]_[artifact]-[revision](-[classifier]).[ext]") + retrieveOptions.setConfs(Array(ivyConfName)) // Add exclusion rules for Spark and Scala Library addExclusionRules(ivySettings, ivyConfName, md) @@ -1525,17 +1570,44 @@ private[spark] object SparkSubmitUtils extends Logging { // resolve dependencies val rr: ResolveReport = ivy.resolve(md, resolveOptions) if (rr.hasError) { - throw new RuntimeException(rr.getAllProblemMessages.toString) + // SPARK-46302: When there are some corrupted jars in the local maven repo, + // we try to continue without the cache + val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true) + if (failedReports.nonEmpty && noCacheIvySettings.isDefined) { + val failedArtifacts = failedReports.map(r => r.getArtifact) + logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " + + s"attempt to retry while skipping local-m2-cache.") + failedArtifacts.foreach(artifact => { + clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache) + }) + ivy.popContext() + + val noCacheIvy = Ivy.newInstance(noCacheIvySettings.get) + noCacheIvy.pushContext() + + val noCacheRr = noCacheIvy.resolve(md, resolveOptions) + if (noCacheRr.hasError) { + throw new RuntimeException(noCacheRr.getAllProblemMessages.toString) + } + noCacheIvy.retrieve(noCacheRr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths( + noCacheRr.getArtifacts.toArray, packagesDirectory) + noCacheIvy.popContext() + + dependencyPaths + } else { + throw new RuntimeException(rr.getAllProblemMessages.toString) + } + } else { + ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, retrieveOptions) + val dependencyPaths = resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) + ivy.popContext() + + dependencyPaths } - // retrieve all resolved dependencies - retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator + - "[organization]_[artifact]-[revision](-[classifier]).[ext]") - ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId, - retrieveOptions.setConfs(Array(ivyConfName))) - resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory) } finally { System.setOut(sysOut) - clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName) + clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings.getDefaultCache, ivyConfName) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index e65e6d42937c..dde3be5ad0c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -145,6 +145,10 @@ private[hive] object IsolatedClientLoader extends Logging { SparkSubmitUtils.buildIvySettings( Some(remoteRepos), ivyPath), + Some(SparkSubmitUtils.buildIvySettings( + Some(remoteRepos), + ivyPath, + useLocalM2AsCache = false)), transitive = true, exclusions = version.exclusions) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org