[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22504 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346067 --- Diff: docs/configuration.md --- @@ -266,6 +266,40 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, each application logs the driver logs to an application specific file. +Users may want to set this to a unified location like an HDFS directory so driver log files can be persisted +for later usage. This directory should allow any Spark user to read/write files and the Spark History Server +user to delete files. Additionally, older logs from this directory are cleaned by + Spark History Server if --- End diff -- remove space after `>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346390 --- Diff: docs/configuration.md --- @@ -266,6 +266,40 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, each application logs the driver logs to an application specific file. +Users may want to set this to a unified location like an HDFS directory so driver log files can be persisted +for later usage. This directory should allow any Spark user to read/write files and the Spark History Server +user to delete files. Additionally, older logs from this directory are cleaned by + Spark History Server if +spark.history.fs.driverlog.cleaner.enabled is true and, if they are older than max age configured +at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.persistToDfs.enabled + false + +If true, spark application running in client mode will write driver logs to a persistent storage, configured +in spark.driver.log.dfsDir. If spark.driver.log.dfsDir is not configured, driver logs +will not be persisted. Additionally, enable the cleaner by setting spark.history.fs.driverlog.cleaner.enabled +to true in Spark History Server. --- End diff -- no space after `>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346161 --- Diff: docs/configuration.md --- @@ -266,6 +266,40 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, each application logs the driver logs to an application specific file. +Users may want to set this to a unified location like an HDFS directory so driver log files can be persisted +for later usage. This directory should allow any Spark user to read/write files and the Spark History Server +user to delete files. Additionally, older logs from this directory are cleaned by + Spark History Server if +spark.history.fs.driverlog.cleaner.enabled is true and, if they are older than max age configured +at spark.history.fs.driverlog.cleaner.maxAge. --- End diff -- s/at/by setting --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346117 --- Diff: docs/configuration.md --- @@ -266,6 +266,40 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, each application logs the driver logs to an application specific file. +Users may want to set this to a unified location like an HDFS directory so driver log files can be persisted +for later usage. This directory should allow any Spark user to read/write files and the Spark History Server +user to delete files. Additionally, older logs from this directory are cleaned by --- End diff -- ...cleaned by the... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346507 --- Diff: docs/configuration.md --- @@ -266,6 +266,40 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, each application logs the driver logs to an application specific file. +Users may want to set this to a unified location like an HDFS directory so driver log files can be persisted +for later usage. This directory should allow any Spark user to read/write files and the Spark History Server +user to delete files. Additionally, older logs from this directory are cleaned by + Spark History Server if +spark.history.fs.driverlog.cleaner.enabled is true and, if they are older than max age configured +at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.persistToDfs.enabled + false + +If true, spark application running in client mode will write driver logs to a persistent storage, configured +in spark.driver.log.dfsDir. If spark.driver.log.dfsDir is not configured, driver logs +will not be persisted. Additionally, enable the cleaner by setting spark.history.fs.driverlog.cleaner.enabled +to true in Spark History Server. + + + + spark.driver.log.layout + %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +The layout for the driver logs that are synced to spark.driver.log.dfsDir. If +spark.driver.log.persistToDfs.enabled is true and this configuration is used. If this is not configured, --- End diff -- No need to mention the `enabled` option here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r231346593 --- Diff: docs/monitoring.md --- @@ -202,6 +202,28 @@ Security options for the Spark History Server are covered more detail in the applications that fail to rename their event logs listed as in-progress. + +spark.history.fs.driverlog.cleaner.enabled +spark.history.fs.cleaner.enabled + + Specifies whether the History Server should periodically clean up driver logs from storage. + + + +spark.history.fs.driverlog.cleaner.interval +spark.history.fs.cleaner.interval + + How often the filesystem driver log history cleaner checks for files to delete. --- End diff -- driver log cleaner --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230464353 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -1033,6 +1102,7 @@ private[history] case class FsHistoryProviderMetadata( private[history] case class LogInfo( @KVIndexParam logPath: String, @KVIndexParam("lastProcessed") lastProcessed: Long, +logType: LogType.Value, --- End diff -- What happens here when you have an existing listing database where this field is not recorded? Depending on what happens, you may need a default value here, or maybe handling nulls somewhere else, or even changing the DB version so old data is invalidated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230463338 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .reverse() .first(maxTime) .asScala + .filter(l => l.logType == LogType.EventLogs) --- End diff -- `.filter { l => ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230465268 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), +DriverLogger.DRIVER_LOG_DIR, +DriverLogger.DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT).get) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DEFAULT_LAYOUT) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not persist driver logs to dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError { + JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile()) + } +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE) +private var threadpool: ScheduledExecutorService = _ +init() + +private def init(): Unit = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf) + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to pe
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230466789 --- Diff: docs/configuration.md --- @@ -266,6 +266,41 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, Spark creates a sub-directory for each application, and logs the driver --- End diff -- The sub-directory part is not true anymore, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230463888 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .reverse() .first(maxTime) .asScala + .filter(l => l.logType == LogType.EventLogs) .toList stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get +val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf) +val currentTime = clock.getTimeMillis() +val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 +val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir)) +while (logFiles.hasNext()) { + val f = logFiles.next() + // Do not rely on 'modtime' as it is not updated for all filesystems when files are written to + val deleteFile = +try { + val info = listing.read(classOf[LogInfo], f.getPath().toString()) + // Update the lastprocessedtime of file if it's length or modification time has changed + if (info.fileSize < f.getLen() || info.lastProcessed < f.getModificationTime()) { +listing.write( + info.copy(lastProcessed = currentTime, fileSize = f.getLen())) +false + } else if (info.lastProcessed > maxTime) { +false + } else { +true + } +} catch { + case e: NoSuchElementException => +// For every new driver log file discovered, create a new entry in listing +listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None, + None, f.getLen())) + false +} + if (deleteFile) { +logInfo(s"Deleting expired driver log for: ${f.getPath().getName()}") +listing.delete(classOf[LogInfo], f.getPath().toString()) +deleteLog(driverLogFs, f.getPath()) + } +} + +// Delete driver log file entries that exceed the configured max age and +// may have been deleted on filesystem externally. +val stale = listing.view(classOf[LogInfo]) + .index("lastProcessed") + .reverse() + .first(maxTime) + .asScala + .filter(i => i.logType == LogType.DriverLogs) --- End diff -- `.filter { i => ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230467097 --- Diff: docs/configuration.md --- @@ -266,6 +266,41 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, Spark creates a sub-directory for each application, and logs the driver +logs specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any Spark +user to read/write files and the Spark History Server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled +is true or if not configured, falling back to spark.history.fs.cleaner.enabled. They are cleaned +if they are older than max age configured at spark.history.fs.driverlog.cleaner.maxAge or if not +configured, falling back to spark.history.fs.cleaner.maxAge. + + + + spark.driver.log.persistToDfs.enabled + false + +If true, spark application running in client mode will write driver logs to a persistent storage, configured +in spark.driver.log.dfsDir. If spark.driver.log.dfsDir is not configured, driver logs +will not be persisted. Additionally, enable the cleaner by setting spark.history.fs.driverlog.cleaner.enabled --- End diff -- Instead of mentioning the cleaner config, I'd add a link to the SHS config page. That makes it clear that the cleaner is not part of the application. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230466226 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -413,6 +417,66 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("driver log cleaner") { +val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10) +val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) +val maxAge = TimeUnit.SECONDS.toSeconds(40) +val clock = new ManualClock(0) +val testConf = new SparkConf() +testConf.set("spark.history.fs.logDirectory", + Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) +testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) +testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) +testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) +testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge) +val provider = new FsHistoryProvider(testConf, clock) + +val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log1) +clock.setTime(firstFileModifiedTime) +log1.setLastModified(clock.getTimeMillis()) +provider.cleanDriverLogs() + +val log2 = FileUtils.getFile(testDir, "2" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log2) +val log3 = FileUtils.getFile(testDir, "3" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log3) +clock.setTime(secondFileModifiedTime) +log2.setLastModified(clock.getTimeMillis()) +log3.setLastModified(clock.getTimeMillis()) +provider.cleanDriverLogs() + +// This should not trigger any cleanup +provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(3) + +// Should trigger cleanup for first file but not second one +clock.setTime(firstFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 1) +provider.cleanDriverLogs() +provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(2) +assert(!log1.exists()) +assert(log2.exists()) +assert(log3.exists()) + +// Update the third file length while keeping the original modified time +Utils.tryLogNonFatalError { --- End diff -- You don't want to catch the error, do you? Otherwise, if it happens, the test won't be doing what you want it to do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r230465790 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -413,6 +417,66 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("driver log cleaner") { +val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10) +val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) +val maxAge = TimeUnit.SECONDS.toSeconds(40) +val clock = new ManualClock(0) +val testConf = new SparkConf() +testConf.set("spark.history.fs.logDirectory", + Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) +testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) +testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) +testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) +testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge) +val provider = new FsHistoryProvider(testConf, clock) + +val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log1) +clock.setTime(firstFileModifiedTime) +log1.setLastModified(clock.getTimeMillis()) +provider.cleanDriverLogs() + +val log2 = FileUtils.getFile(testDir, "2" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log2) +val log3 = FileUtils.getFile(testDir, "3" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log3) +clock.setTime(secondFileModifiedTime) +log2.setLastModified(clock.getTimeMillis()) +log3.setLastModified(clock.getTimeMillis()) +provider.cleanDriverLogs() + +// This should not trigger any cleanup --- End diff -- "This" is the call above this comment, isn't it? Seems like the comment is in the wrong spot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228671856 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -2328,7 +2328,14 @@ private[spark] object Utils extends Logging { * configure a new log4j level */ def setLogLevel(l: org.apache.log4j.Level) { -org.apache.log4j.Logger.getRootLogger().setLevel(l) +val rootLogger = org.apache.log4j.Logger.getRootLogger() +rootLogger.setLevel(l) +rootLogger.getAllAppenders().asScala.foreach { tmp => + tmp match { --- End diff -- You can have the cases directly in the body of the foreach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228669828 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -796,16 +806,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .toList stale.foreach { log => - if (log.appId.isEmpty) { + if (log.appId.isEmpty && + (!conf.get(DRIVER_LOG_DFS_DIR).isDefined || --- End diff -- This check is kinda awkward. How about a new property in `LogInfo` with the type of the log? You could then also just filter those out above before `.toList`, since the `clearBlacklist` call is unrelated to the logs you're adding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228673330 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -413,6 +417,68 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("driver log cleaner") { +val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10) +val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) +val maxAge = TimeUnit.SECONDS.toSeconds(40) +val clock = new ManualClock(0) +val testConf = new SparkConf() +testConf.set("spark.history.fs.logDirectory", + Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) +testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) +testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) +testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) +testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge) +val provider = new FsHistoryProvider(testConf, clock) + +val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log1) +val modTime1 = System.currentTimeMillis() + +clock.setTime(modTime1 + firstFileModifiedTime) +provider.cleanDriverLogs() + +val log2 = FileUtils.getFile(testDir, "2" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log2) +val log3 = FileUtils.getFile(testDir, "3" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log3) +val modTime2 = System.currentTimeMillis() + +clock.setTime(modTime1 + secondFileModifiedTime) +provider.cleanDriverLogs() + +// This should not trigger any cleanup +provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(3) + +// Should trigger cleanup for first file but not second one +clock.setTime(modTime1 + firstFileModifiedTime + TimeUnit.SECONDS.toMillis(maxAge) + 1) +provider.cleanDriverLogs() +provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size should be(2) +assert(!log1.exists()) +assert(log2.exists()) +assert(log3.exists()) + +// Should cleanup the second file but not the third file, as filelength changed. +val writer = new OutputStreamWriter(new BufferedOutputStream(new FileOutputStream(log3))) --- End diff -- `Files.write` is shorter and nicer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228673299 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -413,6 +417,68 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("driver log cleaner") { +val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10) +val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20) +val maxAge = TimeUnit.SECONDS.toSeconds(40) +val clock = new ManualClock(0) +val testConf = new SparkConf() +testConf.set("spark.history.fs.logDirectory", + Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) +testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) +testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) +testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) +testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge) +val provider = new FsHistoryProvider(testConf, clock) + +val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +createEmptyFile(log1) +val modTime1 = System.currentTimeMillis() --- End diff -- You're mixing a manual clock with system time. Why not use `File.setLastModified` like other tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228672761 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), +DriverLogger.DRIVER_LOG_DIR, +DriverLogger.DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.get(DRIVER_LOG_LAYOUT).isDefined) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT).get) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DEFAULT_LAYOUT) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not persist driver logs to dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError { + JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile()) + } +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private var fileSystem: FileSystem = _ +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + fileSystem = new Path(rootDir).getFileSystem(hadoopConf) --- End diff -- This is actually super weird, initializing another field in the initializer of this field. It's looking like you should just have an `init()` function, or maybe have `rootDir` as a field so you can initialize `fileSystem` more directly... (Sometimes I really miss Java constructors.) --- - To unsubscribe, e-mail
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228671145 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -796,16 +806,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .toList stale.foreach { log => - if (log.appId.isEmpty) { + if (log.appId.isEmpty && + (!conf.get(DRIVER_LOG_DFS_DIR).isDefined || + !log.logPath.startsWith(new Path(conf.get(DRIVER_LOG_DFS_DIR).get).toString( { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get +val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf) +val currentTime = clock.getTimeMillis() +val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 +val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir)) +while (logFiles.hasNext()) { --- End diff -- One issue here is that since you're basing this on the file system's contents, if these files are deleted outside of the SHS then you'll accumulate `LogInfo` entries for those deleted entries. The event log cleaner avoids that by basing this logic on the SHS's view of the file system, although I don't know if that same logic can be applied here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r228669149 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -274,11 +275,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) pool.scheduleWithFixedDelay( getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) - if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { + if (conf.get(CLEANER_ENABLED)) { // A task that periodically cleans event logs on disk. pool.scheduleWithFixedDelay( getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + + conf.get(DRIVER_LOG_DFS_DIR).foreach { _ => --- End diff -- `if (conf.contains(DRIVER_LOG_DFS_DIR) && conf.get(DRIVER_LOG_CLEANER_ENABLED))` Don't use `foreach` as a boolean check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r227036915 --- Diff: docs/configuration.md --- @@ -266,6 +266,41 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.persistToDfs.enabled +is true. Within this base directory, Spark creates a sub-directory for each application, and logs the driver +logs specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any Spark +user to read/write files and the Spark History Server user to delete files. Additionally, older logs from --- End diff -- we should add something about this to the security doc with specific information on permissions, like for event logging: https://spark.apache.org/docs/latest/security.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226767375 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get +val currentTime = clock.getTimeMillis() +val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 +val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir)) +while (appDirs.hasNext()) { + val appDirStatus = appDirs.next() + if (appDirStatus.isDirectory()) { +val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath()) +var deleteDir = true +logFiles.foreach { f => + try { +val info = listing.read(classOf[LogInfo], f.getPath().toString()) --- End diff -- Oh, an you probably want unit tests for this new code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226740562 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -2863,6 +2870,14 @@ private[spark] object Utils extends Logging { def stringHalfWidth(str: String): Int = { if (str == null) 0 else str.length + fullWidthRegex.findAllIn(str).size } + + private[spark] def sanitizeDirName(str: String): String = { --- End diff -- This class is already `private[spark]` so the modifier is redundant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226737371 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -116,6 +117,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + private val driverLogFs: Option[FileSystem] = +if (conf.get(DRIVER_LOG_DFS_DIR).isDefined) { --- End diff -- Use `conf.get(DRIVER_LOG_DFS_DIR).map`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226739229 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get +val currentTime = clock.getTimeMillis() +val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 +val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir)) +while (appDirs.hasNext()) { + val appDirStatus = appDirs.next() + if (appDirStatus.isDirectory()) { +val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath()) --- End diff -- Not sure how I missed this before, but why are you storing log files in a subdirectory? What's the advantage? You'll have a ton of directories with a single log file. Instead, just write the log files directly into the parent directory... that also makes this code simpler / faster (less RPCs). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226737882 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -274,11 +282,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) pool.scheduleWithFixedDelay( getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) - if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { + if (conf.get(CLEANER_ENABLED)) { // A task that periodically cleans event logs on disk. pool.scheduleWithFixedDelay( getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + + driverLogFs.foreach { _ => --- End diff -- Here it's clearer to use `if (driverLogFs.isDefined && ...)`. But really this is a bit confusing and I think my previous suggestion is clearer: don't keep `driverLogFs` in a field, just get it when running the cleaner task. And do this initialization based on the config entries, not `driverLogFs`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226740098 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get +val currentTime = clock.getTimeMillis() +val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 +val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir)) +while (appDirs.hasNext()) { + val appDirStatus = appDirs.next() + if (appDirStatus.isDirectory()) { +val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath()) +var deleteDir = true +logFiles.foreach { f => + try { +val info = listing.read(classOf[LogInfo], f.getPath().toString()) --- End diff -- This block probably deserves a comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226741280 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, File, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private var rootDfsDir : File = _ + + override def beforeAll(): Unit = { +super.beforeAll() +rootDfsDir = Utils.createTempDir(namePrefix = "dfs_logs") + } + + override def afterAll(): Unit = { +super.afterAll() +JavaUtils.deleteRecursively(rootDfsDir) + } + + test("driver logs are persisted locally and synced to dfs") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, DriverLogger.DRIVER_LOG_DIR) +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE)) + +sc.stop() +assert(!driverLogsDir.exists()) +val dfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(dfsDir.exists()) +val dfsFiles = dfsDir.listFiles() +dfsFiles.exists{ f => f.getName().equals(DriverLogger.DRIVER_LOG_FILE) && f.length() > 0 } --- End diff -- space before `{` You also need to assert the condition... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226718233 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- Ohh right, the fix is similar to what exists currently. I just wanted to add the note that logs will be deleted after that time period. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226716880 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- So you *are* doing what the current code does (I haven't reviewed the code yet). Your comment made it sound like you were just waiting longer to delete the log. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226710526 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- Sorry, I don't understand your comment. Can you elaborate please? The problem that you described was the old cleaner logic may delete (or try to delete) a log for an active app. With the solution that I added, I am tracking the fileLength and lastProcessedTime. If the fileLength is updated, I do not delete the file and update the lastProcessedTime instead. And I rely on lastProcessedTime to determine whether it is time to delete the file. Please let me know if there is an issue in the logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226706867 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- Isn't that just delaying the problem I described? If you look at how this code handles that for event logs, it's way more complicated than that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226505024 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( --- End diff -- ``` Utils.tryLogNonFatalError { // code goes here } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226504971 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. --- End diff -- That documentation is not user visible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226492916 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. --- End diff -- I provided the information that driverlog cleaner uses a fallback option in its own documentation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226492665 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( --- End diff -- I did not understand this comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226492521 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- Thanks for this. I did not realize that `modtime` is not updated always. Added a fix. The fix will clean the logs when they are older than max(2 * cleaner_interval, max_age). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226472013 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not persist driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) --- End diff -- See my previous comment for why this call is not right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226464199 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, File, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private var rootHdfsDir : File = _ --- End diff -- s/hdfs/dfs in all this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226466182 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from --- End diff -- Spark History Server --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226464800 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() --- End diff -- I'd really be more comfortable with using a temp dir rather than a hardcoded directory name... Or at least use `__driver_logs__` since that syntax conventionally means things that user applications should not use. And use a constant so you don't have to hardcode the value in the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226468029 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, File, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private var rootHdfsDir : File = _ + + override def beforeAll(): Unit = { +rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs") + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(rootHdfsDir) + } + + test("driver logs are persisted locally and synced to hdfs") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// File is continuously synced to Hdfs (which is a local dir for this test) +assert(!driverLogsDir.exists()) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() +assert(hdfsFiles.length > 0) +val driverLogFile = hdfsFiles.filter(f => f.getName.equals("driver.log")).head +val hdfsIS = new BufferedInputStream(new FileInputStream(driverLogFile)) +assert(hdfsIS.available() > 0) +JavaUtils.deleteRecursively(hdfsDir) --- End diff -- This line and the next are unnecessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226461965 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( --- End diff -- Use the nicer closure syntax. ``` blah { do stuff } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226464013 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not persist driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to persist driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(loca
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226462792 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFi
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226468219 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, File, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private var rootHdfsDir : File = _ + + override def beforeAll(): Unit = { +rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs") + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(rootHdfsDir) + } + + test("driver logs are persisted locally and synced to hdfs") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// File is continuously synced to Hdfs (which is a local dir for this test) --- End diff -- Comment is unrelated to any of the code around it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226460844 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) --- End diff -- This is not what I suggested and your way is actually very confusing for the user; especially since you say this is the default in the configuration, whereas in 99.9% of the cases it won't be, since the user will have at least one appender in the log4j conf. If you want this config to have a default value, then you have to use it; it should have higher priority than the first appender's layout. What I actually suggested was for this config to *not* have a default. That solves the confusion; because if you set it, it's used, otherwise, it defaults to the first appender's layout (or the default log4j layout). You could probably do that with `createWithDefaultFunction`, but `createOptional` + some code here also works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226465465 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark --- End diff -- Spark --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226456519 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,6 +118,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + private val driverLogFs: Option[FileSystem] = +if (conf.get(DRIVER_LOG_DFS_DIR).isDefined) { + Some(FileSystem.get(hadoopConf)) --- End diff -- This is not right. It assumes the directory defined is in the defaultFS. See call right above this one for what you should do here. I also don't see a good reason to keep this in a field. Just get the FS reference when the log cleaner runs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226456102 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2421,11 +2425,13 @@ class SparkContext(config: SparkConf) extends Logging { // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) +_driverLogger.foreach(_.startSync(_hadoopConfiguration)) } /** Post the application end event */ private def postApplicationEnd() { listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis)) +_driverLogger.foreach(_.stop()) --- End diff -- I think it may be better call this in `stop()`. Stopping that logger is not part of posting an event to the listener bus. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226458986 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- How are you differentiating logs that are being actively written to? There's this comment at the top of this class: "some filesystems do not appear to update the `modtime` value whenever data is flushed to an open file output stream" That was added because event logs actively being written to don't have their mtime updated. There's a lot of code in this class to deal with that. In that situation you may run into issues here; either deleting a log for an active app, which would then cause lots of errors in that app's logs, or spamming the SHS logs with errors that this log cannot be deleted (not sure what's HDFS's behavior, but I believe it's the former). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226462505 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) --- End diff -- "spark dfs"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226467039 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. --- End diff -- spark.driver.log.syncToDfs.enabled does not exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226468828 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, File, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private var rootHdfsDir : File = _ + + override def beforeAll(): Unit = { +rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs") + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(rootHdfsDir) + } + + test("driver logs are persisted locally and synced to hdfs") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// File is continuously synced to Hdfs (which is a local dir for this test) +assert(!driverLogsDir.exists()) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() --- End diff -- All the checks below are basically: ``` hdfsFiles.exists { f => f.getName() == DRIVER_LOG_FILE && f.length() > 0 } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226457291 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - +conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 --- End diff -- Fits in previous line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226466609 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. --- End diff -- Put config names between ``. This is also wrong, since you do not have an explanation of this option in the documentation. It will be true also if you just enable the normal SHS cleaner. You should explain that option explicitly and say its default value is the value of the other config. The SHS options are in monitoring.md. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226463571 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not persist driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable + with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to persist driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(loca
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226457468 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) --- End diff -- If you're calling this you know the config is set. So you can avoid the extra indentation by just calling `.get`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226463678 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. +They are cleaned if they are older than max age configured at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.syncToDfs.enabled + false + +If true, spark application running in client mode will sync driver logs to a persistent storage, configured --- End diff -- Makes sense. Updated the config name and the wording. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226463522 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogF
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226454499 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFile
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226453812 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. +They are cleaned if they are older than max age configured at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.syncToDfs.enabled + false + +If true, spark application running in client mode will sync driver logs to a persistent storage, configured --- End diff -- I guess I just don't like using the word "sync" here, it makes me think the logs are getting stored somewhere, just not synced to persistent storage, even if this is false. How about renaming the conf to "spark.driver.log.persistToDfs.enabled" and rewording this to If true, spark application running in client mode will write driver logs to a persistent storage, configured in spark.driver.log.dfsDir. If spark.driver.log.dfsDir is not configured, driver logs will not be stored to persistent storage. Additionally, enable the cleaner by setting spark.history.fs.driverlog.cleaner.enabled to true. ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226421974 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogF
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226421401 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. +They are cleaned if they are older than max age configured at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.syncToDfs.enabled + false + +If true, spark application running in client mode will sync driver logs to a persistent storage, configured --- End diff -- I guess, what I meant to say here is that this feature enables syncing to dfs. We do not support syncing to local disk and that is just a implementation detail, not a feature. I am good with using any alternate wording, if that will represent the intent better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226418231 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogF
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226395358 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFile
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226400385 --- Diff: docs/configuration.md --- @@ -266,6 +266,37 @@ of the most common options to set are: Only has effect in Spark standalone mode or Mesos cluster deploy mode. + + spark.driver.log.dfsDir + (none) + +Base directory in which Spark driver logs are synced, if spark.driver.log.syncToDfs.enabled is true. +Within this base directory, Spark creates a sub-directory for each application, and logs the driver logs +specific to the application in this directory. Users may want to set this to a unified location like an +HDFS directory so driver log files can be persisted for later usage. This directory should allow any spark +user to read/write files and the spark history server user to delete files. Additionally, older logs from +this directory are cleaned by Spark History Server if spark.history.fs.driverlog.cleaner.enabled is true. +They are cleaned if they are older than max age configured at spark.history.fs.driverlog.cleaner.maxAge. + + + + spark.driver.log.syncToDfs.enabled + false + +If true, spark application running in client mode will sync driver logs to a persistent storage, configured --- End diff -- it seems like there is a mismatch between what is described here and what is implemented. Do you intend to support a configuration where you are only logging to local disk, and not syncing to dfs? It doesn't seem like `DriverLogger.apply` allows that. And if you do intend to support that, and want it controlled by this configuration, then I'd remove the "syncToDfs" from this name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226394446 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFile
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226391941 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + val fa = LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME) + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + Utils.tryLogNonFatalError(fa.close()) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { --- End diff -- nit: double indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226396957 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs" --- End diff -- use a relative dir, not an absolute one, for testing data. (what if a developer happened to have real data at /tmp/hdfs_logs before running this?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226397391 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs" + + override def beforeAll(): Unit = { +FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + test("driver logs are persisted locally and synced to hdfs") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// On application end, file is moved to Hdfs (which is a local dir for this test) --- End diff -- comment is a bit misleading (or maybe out of date) -- its synced continuously, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226395070 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.security.{AccessControlException, UserGroupInformation} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) +extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var hdfsLogFile: String = _ + private var writer: Option[HdfsAsyncWriter] = None + private var hadoopConfiguration: Configuration = _ + private var fileSystem: FileSystem = _ + + private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS) + private val syncToYarn: Boolean = conf.get(DRIVER_LOG_SYNCTOYARN) + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + val appId = Utils.sanitizeDirName(conf.getAppId) + hdfsLogFile = { +val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key, + "/tmp/driver_logs").split(",").head +FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() + } + hadoopConfiguration = hadoopConf + fileSystem = FileSystem.get(hadoopConf) + + // Setup a writer which moves the local file to hdfs continuously + if (syncToDfs) { +writer = Some(new HdfsAsyncWriter()) +logInfo(s"The local driver log file is being synced to spark dfs at: ${hdfsLogFile}") + } +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + writer.map(_.closeWriter()) + if (syncToYarn) { +moveToYarnAppDir() + } +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + try { + JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile()) + } catch { +case e: Exception => + logError(s"Error in deleting local driver log dir", e) + } +} + } + + private def moveToYarnAppDir(): Unit = { +try { + val appId = Utils.sanitizeDirName(conf.getAppId)
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225343405 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs" + + override def beforeAll(): Unit = { +FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + test("driver logs are persisted") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// On application end, file is moved to Hdfs (which is a local dir for this test) +assert(!driverLogsDir.exists()) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() +assert(hdfsFiles.length > 0) +JavaUtils.deleteRecursively(hdfsDir) +assert(!hdfsDir.exists()) + } + + test("driver logs are synced to hdfs continuously") { +val sc = getSparkContext() --- End diff -- I changed the test to just check the final output, so it does not rely on loop logging anymore. That way, I don't need to expose `SparkContext.driverLogger` as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225310694 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,6 +117,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + private[history] val driverLogFs: Option[FileSystem] = --- End diff -- Just `private`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225312559 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + // Visible for testing + private[spark] var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head --- End diff -- Why are you calling `split` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225314910 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs" + + override def beforeAll(): Unit = { +FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + test("driver logs are persisted") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// On application end, file is moved to Hdfs (which is a local dir for this test) +assert(!driverLogsDir.exists()) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() +assert(hdfsFiles.length > 0) +JavaUtils.deleteRecursively(hdfsDir) +assert(!hdfsDir.exists()) + } + + test("driver logs are synced to hdfs continuously") { +val sc = getSparkContext() --- End diff -- I'm not a fan of what you did, exposing `SparkContext.driverLogger` for tests. It should be possible to write this test without using `SparkContext` at all. Just create a `DriverLogger` and control it from the test. Also, you did not address all of my previous feedback. e.g., the loop logging a bunch of things should not be needed. You should have some explicit action that causes the flush instead of relying on side effects of other calls. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225311036 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -281,6 +288,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) pool.scheduleWithFixedDelay( getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + + driverLogFs.foreach { _ => +if (conf.get(DRIVER_LOG_CLEANER_ENABLED).getOrElse( +conf.getBoolean("spark.history.fs.cleaner.enabled", false))) { --- End diff -- Turn `spark.history.fs.cleaner.enabled` into a config constant and use `fallbackConf`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225315220 --- Diff: core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{BufferedInputStream, FileInputStream} + +import org.apache.commons.io.FileUtils + +import org.apache.spark._ +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.Utils + +class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { + + private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs" + + override def beforeAll(): Unit = { +FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + override def afterAll(): Unit = { +JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT)) + } + + test("driver logs are persisted") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) + +sc.stop() +// On application end, file is moved to Hdfs (which is a local dir for this test) +assert(!driverLogsDir.exists()) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() +assert(hdfsFiles.length > 0) +JavaUtils.deleteRecursively(hdfsDir) +assert(!hdfsDir.exists()) + } + + test("driver logs are synced to hdfs continuously") { +val sc = getSparkContext() + +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs") +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals("driver.log")) +for (i <- 1 to 1000) { + logInfo("Log enough data to log file so that it can be flushed") +} + +// Sync the driver logs manually instead of waiting for scheduler +sc._driverLogger.foreach(_.writer.foreach(_.run())) +val hdfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id) +assert(hdfsDir.exists()) +val hdfsFiles = hdfsDir.listFiles() +assert(hdfsFiles.length > 0) +val driverLogFile = hdfsFiles.filter(f => f.getName.equals("driver.log")).head +val hdfsIS = new BufferedInputStream(new FileInputStream(driverLogFile)) +assert(hdfsIS.available() > 0) + +sc.stop() +// Ensure that the local file is deleted on application end +assert(!driverLogsDir.exists()) +JavaUtils.deleteRecursively(hdfsDir) +assert(!hdfsDir.exists()) + } + + private def getSparkContext(): SparkContext = { +val conf = new SparkConf() +conf.set("spark.local.dir", "/tmp") --- End diff -- Don't do this. Tests run with a pre-configured temp directory, and should not write to `/tmp`. If you need to create a temp directory, do so explicitly using the existing `Utils` API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.a
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225312422 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + // Visible for testing + private[spark] var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) --- End diff -- Shouldn't you close the appender you added, so the underlying file is closed / flushed too? Or is that done implicitly by this call somehow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225310830 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,6 +117,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing private[history] val fs: FileSystem = new Path(logDir).getFileSystem(hadoopConf) + private[history] val driverLogFs: Option[FileSystem] = +if (conf.getOption("spark.driver.log.dfsDir").isDefined) { --- End diff -- `spark.driver.log.dfsDir` is a config constant. Use the constant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225311476 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.getOption("spark.driver.log.dfsDir") +driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - --- End diff -- Create a config constant for the existing config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225313815 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + // Visible for testing + private[spark] var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFile)) + outputStream = fileSystem.create(new Path(dfsLogFile), true) +
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225311218 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -281,6 +288,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) pool.scheduleWithFixedDelay( getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + + driverLogFs.foreach { _ => +if (conf.get(DRIVER_LOG_CLEANER_ENABLED).getOrElse( +conf.getBoolean("spark.history.fs.cleaner.enabled", false))) { + pool.scheduleWithFixedDelay(getRunner(() => cleanDriverLogs()), +0, + conf.get(DRIVER_LOG_CLEANER_INTERVAL).getOrElse(CLEAN_INTERVAL_S), --- End diff -- Create config constant for existing config, use `fallbackConf`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225313921 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + // Visible for testing + private[spark] var writer: Option[DfsAsyncWriter] = None + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} else if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + val appId = Utils.sanitizeDirName(conf.getAppId) + writer = Some(new DfsAsyncWriter(appId, hadoopConf)) +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME) + writer.foreach(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + Utils.tryLogNonFatalError(JavaUtils.deleteRecursively( +FileUtils.getFile(localLogFile).getParentFile())) +} + } + + // Visible for testing + private[spark] class DfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val dfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head + if (!fileSystem.exists(new Path(rootDir))) { +throw new RuntimeException(s"${rootDir} does not exist." + + s" Please create this dir in order to sync driver logs") + } + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var inStream: InputStream = null +private var outputStream: FSDataOutputStream = null +try { + inStream = new BufferedInputStream(new FileInputStream(localLogFile)) + outputStream = fileSystem.create(new Path(dfsLogFile), true) +
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r225311355 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") -deleteLog(new Path(log.logPath)) +deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { +val driverLogDir = conf.getOption("spark.driver.log.dfsDir") --- End diff -- Use the config constant... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r224189470 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(appDirStatus.getPath()) --- End diff -- Added the new configurations, with fallback option to existing ones. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223902134 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[HdfsAsyncWriter] = None + private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS) + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + if (syncToDfs) { +val appId = Utils.sanitizeDirName(conf.getAppId) +writer = Some(new HdfsAsyncWriter(appId, hadoopConf)) + } +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + writer.map(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + try { + JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile()) + } catch { +case e: Exception => + logError(s"Error in deleting local driver log dir", e) + } +} + } + + private class HdfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val inStream = new BufferedInputStream(new FileInputStream(localLogFile)) +private var hdfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key, "/tmp/driver_logs").split(",").head + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val outputStream: FSDataOutputStream = fileSystem.create(new Path(hdfsLogFile), true) +fileSystem.setPermission(new Path(hdfsLogFile), LOG_FILE_PERMISSIONS) +private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE) +private val threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread") +threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, UP
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223902004 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(appDirStatus.getPath()) --- End diff -- It's pretty trivial to create a different task to run this code you're adding... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223901876 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( +Utils.getLocalDir(conf), "driver_logs", DRIVER_LOG_FILE).getAbsolutePath() + private var writer: Option[HdfsAsyncWriter] = None + private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS) + + addLogAppender() + + private def addLogAppender(): Unit = { +val appenders = LogManager.getRootLogger().getAllAppenders() +val layout = if (appenders.hasMoreElements()) { + appenders.nextElement().asInstanceOf[Appender].getLayout() +} else { + new PatternLayout(conf.get(DRIVER_LOG_LAYOUT)) +} +val fa = new Log4jFileAppender(layout, localLogFile) +fa.setName(DriverLogger.APPENDER_NAME) +LogManager.getRootLogger().addAppender(fa) +logInfo(s"Added a local log appender at: ${localLogFile}") + } + + def startSync(hadoopConf: Configuration): Unit = { +try { + // Setup a writer which moves the local file to hdfs continuously + if (syncToDfs) { +val appId = Utils.sanitizeDirName(conf.getAppId) +writer = Some(new HdfsAsyncWriter(appId, hadoopConf)) + } +} catch { + case e: Exception => +logError(s"Could not sync driver logs to spark dfs", e) +} + } + + def stop(): Unit = { +try { + writer.map(_.closeWriter()) +} catch { + case e: Exception => +logError(s"Error in persisting driver logs", e) +} finally { + try { + JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile()) + } catch { +case e: Exception => + logError(s"Error in deleting local driver log dir", e) + } +} + } + + private class HdfsAsyncWriter(appId: String, hadoopConf: Configuration) extends Runnable +with Logging { + +private var streamClosed = false +private val inStream = new BufferedInputStream(new FileInputStream(localLogFile)) +private var hdfsLogFile: String = { + val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key, "/tmp/driver_logs").split(",").head + FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath() +} +private var fileSystem: FileSystem = FileSystem.get(hadoopConf) +private val outputStream: FSDataOutputStream = fileSystem.create(new Path(hdfsLogFile), true) +fileSystem.setPermission(new Path(hdfsLogFile), LOG_FILE_PERMISSIONS) +private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE) +private val threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread") +threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS,
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223901520 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(appDirStatus.getPath()) --- End diff -- I added the documentation but I am still relying on the existing history server configurations. This is because I am using an existing cleaner. Otherwise, I may have to create another cleaner for this, which can run at a different interval and has a different configured max age. Please let me know if I should create another cleaner for this or just provide a configuration for a different max age. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223900576 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { --- End diff -- Could be. If it doesn't work this is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223900426 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { --- End diff -- Didn't work. Maybe because it is a RemoteIterator? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223871565 --- Diff: core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io._ +import java.util.concurrent.TimeUnit + +import scala.util.{Failure, Try} + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.log4j.{FileAppender => Log4jFileAppender, _} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class DriverLogger(conf: SparkConf) extends Logging { + + private val UPLOAD_CHUNK_SIZE = 1024 * 1024 + private val UPLOAD_INTERVAL_IN_SECS = 5 + private val DRIVER_LOG_FILE = "driver.log" + private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) + + private var localLogFile: String = FileUtils.getFile( --- End diff -- Utils.getLocalDir also does that actually. Spark deletes the directory that it creates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223866624 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -48,6 +48,19 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val DRIVER_LOG_DFS_DIR = +ConfigBuilder("spark.driver.log.dfsDir").stringConf.createOptional + + private[spark] val DRIVER_LOG_LAYOUT = +ConfigBuilder("spark.driver.log.layout") + .stringConf + .createWithDefault("%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n") + + private[spark] val DRIVER_LOG_SYNCTODFS = --- End diff -- Let's leave it. But there's some usage of the enabled flag in your code that you can still clean up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223866023 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -48,6 +48,19 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val DRIVER_LOG_DFS_DIR = +ConfigBuilder("spark.driver.log.dfsDir").stringConf.createOptional + + private[spark] val DRIVER_LOG_LAYOUT = +ConfigBuilder("spark.driver.log.layout") + .stringConf + .createWithDefault("%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n") + + private[spark] val DRIVER_LOG_SYNCTODFS = --- End diff -- Yes, as you said I prefer to keep the functionality of disabling/enabling separate from log dir. But I do see that removing this reduces the number of configurations. I am ok either ways. Please let me know if I should remove this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223864050 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(appDirStatus.getPath()) +} + } +} catch { + case nse: NoSuchElementException => // no-op --- End diff -- It's an optional config. If you use the config constant this will go away. (And even in the legacy case you should use `conf.getOption` in those cases.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...
Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r223859868 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) + +// Delete driver logs from the configured spark dfs dir that exceed the configured max age +try { + val hdfsDir = conf.get("spark.driver.log.dfsDir") + val appDirs = fs.listLocatedStatus(new Path(hdfsDir)) + while (appDirs.hasNext()) { +val appDirStatus = appDirs.next() +if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(appDirStatus.getPath()) +} + } +} catch { + case nse: NoSuchElementException => // no-op --- End diff -- conf.get("spark.driver.log.dfsDir") --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org