This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path b5fbdeb5483 is described below commit b5fbdeb5483fa4b3c6102a99fa84a0677e145c42 Author: Swaminathan Balachandran <swamirishi...@gmail.com> AuthorDate: Thu Nov 10 18:16:44 2022 -0600 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path ### What changes were proposed in this pull request? Parsing the configuration value of the config value of spark.driver.log.dfsDir using Hadoop FS Path & extracting path from the URI should fix the problem ### Why are the changes needed? Currently when one passes the Absolute URI to configured filesystem, the code would fail while trying to copy the local log file to the filesystem directory path. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test cases Closes #38377 from swamirishi/SPARK-40901. Authored-by: Swaminathan Balachandran <swamirishi...@gmail.com> Signed-off-by: Mridul <mridul<at>gmail.com> --- .../apache/spark/util/logging/DriverLogger.scala | 8 ++--- .../spark/util/logging/DriverLoggerSuite.scala | 42 +++++++++++++++++++--- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala index bb57e032563..4f56cf24a2f 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala @@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging { throw new RuntimeException(s"${rootDir} does not exist." + s" Please create this dir in order to persist driver logs") } - val dfsLogFile: String = FileUtils.getFile(rootDir, appId - + DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath() + val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId + + DriverLogger.DRIVER_LOG_FILE_SUFFIX)) try { inStream = new BufferedInputStream(new FileInputStream(localLogFile)) - outputStream = SparkHadoopUtil.createFile(fileSystem, new Path(dfsLogFile), + outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, conf.get(DRIVER_LOG_ALLOW_EC)) - fileSystem.setPermission(new Path(dfsLogFile), LOG_FILE_PERMISSIONS) + fileSystem.setPermission(dfsLogFile, LOG_FILE_PERMISSIONS) } catch { case e: Exception => JavaUtils.closeQuietly(inStream) diff --git a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala index bd7ec242a93..9599bd29188 100644 --- a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util.logging import java.io.File import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.{SparkContext, SparkFunSuite} @@ -65,12 +66,43 @@ class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { assert(dfsFile.length() > 0) } + test("SPARK-40901: driver logs are persisted locally and synced to dfs when log " + + "dir is absolute URI") { + val sparkConf = new SparkConf() + sparkConf.set(DRIVER_LOG_DFS_DIR, "file://" + rootDfsDir.getAbsolutePath()) + val sc = getSparkContext(sparkConf) + val app_id = sc.applicationId + // Run a simple spark application + sc.parallelize(1 to 1000).count() + + // Assert driver log file exists + val rootDir = Utils.getLocalDir(sc.getConf) + val driverLogsDir = FileUtils.getFile(rootDir, DriverLogger.DRIVER_LOG_DIR) + assert(driverLogsDir.exists()) + val files = driverLogsDir.listFiles() + assert(files.length === 1) + assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE)) + + sc.stop() + assert(!driverLogsDir.exists()) + assert(sc.getConf.get(DRIVER_LOG_DFS_DIR).get.startsWith("file:///")) + val dfsFile = new Path(sc.getConf.get(DRIVER_LOG_DFS_DIR).get + + "/" + app_id + DriverLogger.DRIVER_LOG_FILE_SUFFIX) + val dfsFileStatus = dfsFile.getFileSystem(sc.hadoopConfiguration).getFileStatus(dfsFile) + + assert(dfsFileStatus.isFile) + assert(dfsFileStatus.getLen > 0) + } + private def getSparkContext(): SparkContext = { - val conf = new SparkConf() - conf.set(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath()) - conf.set(DRIVER_LOG_PERSISTTODFS, true) - conf.set(SparkLauncher.SPARK_MASTER, "local") - conf.set(SparkLauncher.DEPLOY_MODE, "client") + getSparkContext(new SparkConf()) + } + + private def getSparkContext(conf: SparkConf): SparkContext = { + conf.setIfMissing(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath()) + conf.setIfMissing(DRIVER_LOG_PERSISTTODFS, true) + conf.setIfMissing(SparkLauncher.SPARK_MASTER, "local") + conf.setIfMissing(SparkLauncher.DEPLOY_MODE, "client") sc = new SparkContext("local", "DriverLogTest", conf) sc } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org