[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22504


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346067
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,40 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, each application logs the driver 
logs to an application specific file.
+Users may want to set this to a unified location like an HDFS 
directory so driver log files can be persisted
+for later usage. This directory should allow any Spark user to 
read/write files and the Spark History Server
+user to delete files. Additionally, older logs from this directory are 
cleaned by
+ 
Spark History Server  if
--- End diff --

remove space after `>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346390
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,40 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, each application logs the driver 
logs to an application specific file.
+Users may want to set this to a unified location like an HDFS 
directory so driver log files can be persisted
+for later usage. This directory should allow any Spark user to 
read/write files and the Spark History Server
+user to delete files. Additionally, older logs from this directory are 
cleaned by
+ 
Spark History Server  if
+spark.history.fs.driverlog.cleaner.enabled is true and, 
if they are older than max age configured
+at spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.persistToDfs.enabled
+  false
+  
+If true, spark application running in client mode will write driver 
logs to a persistent storage, configured
+in spark.driver.log.dfsDir. If 
spark.driver.log.dfsDir is not configured, driver logs
+will not be persisted. Additionally, enable the cleaner by setting 
spark.history.fs.driverlog.cleaner.enabled
+to true in  Spark 
History Server.
--- End diff --

no space after `>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346161
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,40 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, each application logs the driver 
logs to an application specific file.
+Users may want to set this to a unified location like an HDFS 
directory so driver log files can be persisted
+for later usage. This directory should allow any Spark user to 
read/write files and the Spark History Server
+user to delete files. Additionally, older logs from this directory are 
cleaned by
+ 
Spark History Server  if
+spark.history.fs.driverlog.cleaner.enabled is true and, 
if they are older than max age configured
+at spark.history.fs.driverlog.cleaner.maxAge.
--- End diff --

s/at/by setting


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346117
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,40 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, each application logs the driver 
logs to an application specific file.
+Users may want to set this to a unified location like an HDFS 
directory so driver log files can be persisted
+for later usage. This directory should allow any Spark user to 
read/write files and the Spark History Server
+user to delete files. Additionally, older logs from this directory are 
cleaned by
--- End diff --

...cleaned by the...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346507
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,40 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, each application logs the driver 
logs to an application specific file.
+Users may want to set this to a unified location like an HDFS 
directory so driver log files can be persisted
+for later usage. This directory should allow any Spark user to 
read/write files and the Spark History Server
+user to delete files. Additionally, older logs from this directory are 
cleaned by
+ 
Spark History Server  if
+spark.history.fs.driverlog.cleaner.enabled is true and, 
if they are older than max age configured
+at spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.persistToDfs.enabled
+  false
+  
+If true, spark application running in client mode will write driver 
logs to a persistent storage, configured
+in spark.driver.log.dfsDir. If 
spark.driver.log.dfsDir is not configured, driver logs
+will not be persisted. Additionally, enable the cleaner by setting 
spark.history.fs.driverlog.cleaner.enabled
+to true in  Spark 
History Server.
+  
+
+
+  spark.driver.log.layout
+  %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+  
+The layout for the driver logs that are synced to 
spark.driver.log.dfsDir. If 
+spark.driver.log.persistToDfs.enabled is true and this 
configuration is used. If this is not configured,
--- End diff --

No need to mention the `enabled` option here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r231346593
  
--- Diff: docs/monitoring.md ---
@@ -202,6 +202,28 @@ Security options for the Spark History Server are 
covered more detail in the
   applications that fail to rename their event logs listed as 
in-progress.
 
   
+  
+spark.history.fs.driverlog.cleaner.enabled
+spark.history.fs.cleaner.enabled
+
+  Specifies whether the History Server should periodically clean up 
driver logs from storage.
+
+  
+  
+spark.history.fs.driverlog.cleaner.interval
+spark.history.fs.cleaner.interval
+
+  How often the filesystem driver log history cleaner checks for files 
to delete.
--- End diff --

driver log cleaner


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230464353
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -1033,6 +1102,7 @@ private[history] case class FsHistoryProviderMetadata(
 private[history] case class LogInfo(
 @KVIndexParam logPath: String,
 @KVIndexParam("lastProcessed") lastProcessed: Long,
+logType: LogType.Value,
--- End diff --

What happens here when you have an existing listing database where this 
field is not recorded?

Depending on what happens, you may need a default value here, or maybe 
handling nulls somewhere else, or even changing the DB version so old data is 
invalidated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230463338
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   .reverse()
   .first(maxTime)
   .asScala
+  .filter(l => l.logType == LogType.EventLogs)
--- End diff --

`.filter { l => ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230465268
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: 
%m%n"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf),
+DriverLogger.DRIVER_LOG_DIR,
+DriverLogger.DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT).get)
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DEFAULT_LAYOUT)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not persist driver logs to dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError {
+
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
+  }
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE)
+private var threadpool: ScheduledExecutorService = _
+init()
+
+private def init(): Unit = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  val fileSystem: FileSystem = new 
Path(rootDir).getFileSystem(hadoopConf)
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to pe

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230466789
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,41 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, Spark creates a sub-directory for 
each application, and logs the driver
--- End diff --

The sub-directory part is not true anymore, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230463888
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -812,18 +821,74 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   .reverse()
   .first(maxTime)
   .asScala
+  .filter(l => l.logType == LogType.EventLogs)
   .toList
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
+val currentTime = clock.getTimeMillis()
+val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
+while (logFiles.hasNext()) {
+  val f = logFiles.next()
+  // Do not rely on 'modtime' as it is not updated for all filesystems 
when files are written to
+  val deleteFile =
+try {
+  val info = listing.read(classOf[LogInfo], f.getPath().toString())
+  // Update the lastprocessedtime of file if it's length or 
modification time has changed
+  if (info.fileSize < f.getLen() || info.lastProcessed < 
f.getModificationTime()) {
+listing.write(
+  info.copy(lastProcessed = currentTime, fileSize = 
f.getLen()))
+false
+  } else if (info.lastProcessed > maxTime) {
+false
+  } else {
+true
+  }
+} catch {
+  case e: NoSuchElementException =>
+// For every new driver log file discovered, create a new 
entry in listing
+listing.write(LogInfo(f.getPath().toString(), currentTime, 
LogType.DriverLogs, None,
+  None, f.getLen()))
+  false
+}
+  if (deleteFile) {
+logInfo(s"Deleting expired driver log for: 
${f.getPath().getName()}")
+listing.delete(classOf[LogInfo], f.getPath().toString())
+deleteLog(driverLogFs, f.getPath())
+  }
+}
+
+// Delete driver log file entries that exceed the configured max age 
and
+// may have been deleted on filesystem externally.
+val stale = listing.view(classOf[LogInfo])
+  .index("lastProcessed")
+  .reverse()
+  .first(maxTime)
+  .asScala
+  .filter(i => i.logType == LogType.DriverLogs)
--- End diff --

`.filter { i => ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230467097
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,41 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, Spark creates a sub-directory for 
each application, and logs the driver
+logs specific to the application in this directory. Users may want to 
set this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any Spark
+user to read/write files and the Spark History Server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled
+is true or if not configured, falling back to 
spark.history.fs.cleaner.enabled. They are cleaned
+if they are older than max age configured at 
spark.history.fs.driverlog.cleaner.maxAge or if not
+configured, falling back to 
spark.history.fs.cleaner.maxAge.
+  
+
+
+  spark.driver.log.persistToDfs.enabled
+  false
+  
+If true, spark application running in client mode will write driver 
logs to a persistent storage, configured
+in spark.driver.log.dfsDir. If 
spark.driver.log.dfsDir is not configured, driver logs
+will not be persisted. Additionally, enable the cleaner by setting 
spark.history.fs.driverlog.cleaner.enabled
--- End diff --

Instead of mentioning the cleaner config, I'd add a link to the SHS config 
page. That makes it clear that the cleaner is not part of the application.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230466226
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -413,6 +417,66 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 }
   }
 
+  test("driver log cleaner") {
+val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+val maxAge = TimeUnit.SECONDS.toSeconds(40)
+val clock = new ManualClock(0)
+val testConf = new SparkConf()
+testConf.set("spark.history.fs.logDirectory",
+  Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
+testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
+testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
+testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge)
+val provider = new FsHistoryProvider(testConf, clock)
+
+val log1 = FileUtils.getFile(testDir, "1" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log1)
+clock.setTime(firstFileModifiedTime)
+log1.setLastModified(clock.getTimeMillis())
+provider.cleanDriverLogs()
+
+val log2 = FileUtils.getFile(testDir, "2" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log2)
+val log3 = FileUtils.getFile(testDir, "3" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log3)
+clock.setTime(secondFileModifiedTime)
+log2.setLastModified(clock.getTimeMillis())
+log3.setLastModified(clock.getTimeMillis())
+provider.cleanDriverLogs()
+
+// This should not trigger any cleanup
+provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(3)
+
+// Should trigger cleanup for first file but not second one
+clock.setTime(firstFileModifiedTime + 
TimeUnit.SECONDS.toMillis(maxAge) + 1)
+provider.cleanDriverLogs()
+provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(2)
+assert(!log1.exists())
+assert(log2.exists())
+assert(log3.exists())
+
+// Update the third file length while keeping the original modified 
time
+Utils.tryLogNonFatalError {
--- End diff --

You don't want to catch the error, do you? Otherwise, if it happens, the 
test won't be doing what you want it to do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-11-02 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r230465790
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -413,6 +417,66 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 }
   }
 
+  test("driver log cleaner") {
+val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+val maxAge = TimeUnit.SECONDS.toSeconds(40)
+val clock = new ManualClock(0)
+val testConf = new SparkConf()
+testConf.set("spark.history.fs.logDirectory",
+  Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
+testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
+testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
+testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge)
+val provider = new FsHistoryProvider(testConf, clock)
+
+val log1 = FileUtils.getFile(testDir, "1" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log1)
+clock.setTime(firstFileModifiedTime)
+log1.setLastModified(clock.getTimeMillis())
+provider.cleanDriverLogs()
+
+val log2 = FileUtils.getFile(testDir, "2" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log2)
+val log3 = FileUtils.getFile(testDir, "3" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log3)
+clock.setTime(secondFileModifiedTime)
+log2.setLastModified(clock.getTimeMillis())
+log3.setLastModified(clock.getTimeMillis())
+provider.cleanDriverLogs()
+
+// This should not trigger any cleanup
--- End diff --

"This" is the call above this comment, isn't it? Seems like the comment is 
in the wrong spot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228671856
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2328,7 +2328,14 @@ private[spark] object Utils extends Logging {
* configure a new log4j level
*/
   def setLogLevel(l: org.apache.log4j.Level) {
-org.apache.log4j.Logger.getRootLogger().setLevel(l)
+val rootLogger = org.apache.log4j.Logger.getRootLogger()
+rootLogger.setLevel(l)
+rootLogger.getAllAppenders().asScala.foreach { tmp =>
+  tmp match {
--- End diff --

You can have the cases directly in the body of the foreach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228669828
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -796,16 +806,57 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   .asScala
   .toList
 stale.foreach { log =>
-  if (log.appId.isEmpty) {
+  if (log.appId.isEmpty &&
+  (!conf.get(DRIVER_LOG_DFS_DIR).isDefined ||
--- End diff --

This check is kinda awkward. How about a new property in `LogInfo` with the 
type of the log?

You could then also just filter those out above before `.toList`, since the 
`clearBlacklist` call is unrelated to the logs you're adding.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228673330
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -413,6 +417,68 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 }
   }
 
+  test("driver log cleaner") {
+val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+val maxAge = TimeUnit.SECONDS.toSeconds(40)
+val clock = new ManualClock(0)
+val testConf = new SparkConf()
+testConf.set("spark.history.fs.logDirectory",
+  Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
+testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
+testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
+testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge)
+val provider = new FsHistoryProvider(testConf, clock)
+
+val log1 = FileUtils.getFile(testDir, "1" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log1)
+val modTime1 = System.currentTimeMillis()
+
+clock.setTime(modTime1 + firstFileModifiedTime)
+provider.cleanDriverLogs()
+
+val log2 = FileUtils.getFile(testDir, "2" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log2)
+val log3 = FileUtils.getFile(testDir, "3" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log3)
+val modTime2 = System.currentTimeMillis()
+
+clock.setTime(modTime1 + secondFileModifiedTime)
+provider.cleanDriverLogs()
+
+// This should not trigger any cleanup
+provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(3)
+
+// Should trigger cleanup for first file but not second one
+clock.setTime(modTime1 + firstFileModifiedTime + 
TimeUnit.SECONDS.toMillis(maxAge) + 1)
+provider.cleanDriverLogs()
+provider.listing.view(classOf[LogInfo]).iterator().asScala.toSeq.size 
should be(2)
+assert(!log1.exists())
+assert(log2.exists())
+assert(log3.exists())
+
+// Should cleanup the second file but not the third file, as 
filelength changed.
+val writer = new OutputStreamWriter(new BufferedOutputStream(new 
FileOutputStream(log3)))
--- End diff --

`Files.write` is shorter and nicer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228673299
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -413,6 +417,68 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 }
   }
 
+  test("driver log cleaner") {
+val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
+val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)
+val maxAge = TimeUnit.SECONDS.toSeconds(40)
+val clock = new ManualClock(0)
+val testConf = new SparkConf()
+testConf.set("spark.history.fs.logDirectory",
+  Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
+testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
+testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
+testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
+testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge)
+val provider = new FsHistoryProvider(testConf, clock)
+
+val log1 = FileUtils.getFile(testDir, "1" + 
DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+createEmptyFile(log1)
+val modTime1 = System.currentTimeMillis()
--- End diff --

You're mixing a manual clock with system time. Why not use 
`File.setLastModified` like other tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228672761
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: 
%m%n"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf),
+DriverLogger.DRIVER_LOG_DIR,
+DriverLogger.DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.get(DRIVER_LOG_LAYOUT).isDefined) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT).get)
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DEFAULT_LAYOUT)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not persist driver logs to dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError {
+
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
+  }
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private var fileSystem: FileSystem = _
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  fileSystem = new Path(rootDir).getFileSystem(hadoopConf)
--- End diff --

This is actually super weird, initializing another field in the initializer 
of this field.

It's looking like you should just have an `init()` function, or maybe have 
`rootDir` as a field so you can initialize `fileSystem` more directly...

(Sometimes I really miss Java constructors.)


---

-
To unsubscribe, e-mail

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228671145
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -796,16 +806,57 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   .asScala
   .toList
 stale.foreach { log =>
-  if (log.appId.isEmpty) {
+  if (log.appId.isEmpty &&
+  (!conf.get(DRIVER_LOG_DFS_DIR).isDefined ||
+  !log.logPath.startsWith(new 
Path(conf.get(DRIVER_LOG_DFS_DIR).get).toString( {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+val driverLogFs = new Path(driverLogDir).getFileSystem(hadoopConf)
+val currentTime = clock.getTimeMillis()
+val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+val logFiles = driverLogFs.listLocatedStatus(new Path(driverLogDir))
+while (logFiles.hasNext()) {
--- End diff --

One issue here is that since you're basing this on the file system's 
contents, if these files are deleted outside of the SHS then you'll accumulate 
`LogInfo` entries for those deleted entries.

The event log cleaner avoids that by basing this logic on the SHS's view of 
the file system, although I don't know if that same logic can be applied here.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r228669149
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -274,11 +275,20 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   pool.scheduleWithFixedDelay(
 getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, 
TimeUnit.SECONDS)
 
-  if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
+  if (conf.get(CLEANER_ENABLED)) {
 // A task that periodically cleans event logs on disk.
 pool.scheduleWithFixedDelay(
   getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, 
TimeUnit.SECONDS)
   }
+
+  conf.get(DRIVER_LOG_DFS_DIR).foreach { _ =>
--- End diff --

`if (conf.contains(DRIVER_LOG_DFS_DIR) && 
conf.get(DRIVER_LOG_CLEANER_ENABLED))`

Don't use `foreach` as a boolean check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-22 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r227036915
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,41 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.persistToDfs.enabled
+is true. Within this base directory, Spark creates a sub-directory for 
each application, and logs the driver
+logs specific to the application in this directory. Users may want to 
set this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any Spark
+user to read/write files and the Spark History Server user to delete 
files. Additionally, older logs from
--- End diff --

we should add something about this to the security doc with specific 
information on permissions, like for event logging: 
https://spark.apache.org/docs/latest/security.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226767375
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+val currentTime = clock.getTimeMillis()
+val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir))
+while (appDirs.hasNext()) {
+  val appDirStatus = appDirs.next()
+  if (appDirStatus.isDirectory()) {
+val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath())
+var deleteDir = true
+logFiles.foreach { f =>
+  try {
+val info = listing.read(classOf[LogInfo], 
f.getPath().toString())
--- End diff --

Oh, an you probably want unit tests for this new code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226740562
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2863,6 +2870,14 @@ private[spark] object Utils extends Logging {
   def stringHalfWidth(str: String): Int = {
 if (str == null) 0 else str.length + fullWidthRegex.findAllIn(str).size
   }
+
+  private[spark] def sanitizeDirName(str: String): String = {
--- End diff --

This class is already `private[spark]` so the modifier is redundant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226737371
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -116,6 +117,13 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // Visible for testing
   private[history] val fs: FileSystem = new 
Path(logDir).getFileSystem(hadoopConf)
 
+  private val driverLogFs: Option[FileSystem] =
+if (conf.get(DRIVER_LOG_DFS_DIR).isDefined) {
--- End diff --

Use `conf.get(DRIVER_LOG_DFS_DIR).map`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226739229
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+val currentTime = clock.getTimeMillis()
+val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir))
+while (appDirs.hasNext()) {
+  val appDirStatus = appDirs.next()
+  if (appDirStatus.isDirectory()) {
+val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath())
--- End diff --

Not sure how I missed this before, but why are you storing log files in a 
subdirectory? What's the advantage? You'll have a ton of directories with a 
single log file. Instead, just write the log files directly into the parent 
directory... that also makes this code simpler / faster (less RPCs).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226737882
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -274,11 +282,20 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   pool.scheduleWithFixedDelay(
 getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, 
TimeUnit.SECONDS)
 
-  if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
+  if (conf.get(CLEANER_ENABLED)) {
 // A task that periodically cleans event logs on disk.
 pool.scheduleWithFixedDelay(
   getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, 
TimeUnit.SECONDS)
   }
+
+  driverLogFs.foreach { _ =>
--- End diff --

Here it's clearer to use `if (driverLogFs.isDefined && ...)`.

But really this is a bit confusing and I think my previous suggestion is 
clearer: don't keep `driverLogFs` in a field, just get it when running the 
cleaner task. And do this initialization based on the config entries, not 
`driverLogFs`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226740098
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -798,14 +815,52 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR).get
+val currentTime = clock.getTimeMillis()
+val maxTime = currentTime - conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+val appDirs = driverLogFs.get.listLocatedStatus(new Path(driverLogDir))
+while (appDirs.hasNext()) {
+  val appDirStatus = appDirs.next()
+  if (appDirStatus.isDirectory()) {
+val logFiles = driverLogFs.get.listStatus(appDirStatus.getPath())
+var deleteDir = true
+logFiles.foreach { f =>
+  try {
+val info = listing.read(classOf[LogInfo], 
f.getPath().toString())
--- End diff --

This block probably deserves a comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226741280
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, File, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private var rootDfsDir : File = _
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+rootDfsDir = Utils.createTempDir(namePrefix = "dfs_logs")
+  }
+
+  override def afterAll(): Unit = {
+super.afterAll()
+JavaUtils.deleteRecursively(rootDfsDir)
+  }
+
+  test("driver logs are persisted locally and synced to dfs") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, 
DriverLogger.DRIVER_LOG_DIR)
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE))
+
+sc.stop()
+assert(!driverLogsDir.exists())
+val dfsDir = FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, 
app_id)
+assert(dfsDir.exists())
+val dfsFiles = dfsDir.listFiles()
+dfsFiles.exists{ f => f.getName().equals(DriverLogger.DRIVER_LOG_FILE) 
&& f.length() > 0 }
--- End diff --

space before `{`

You also need to assert the condition...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226718233
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

Ohh right, the fix is similar to what exists currently. I just wanted to 
add the note that logs will be deleted after that time period.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226716880
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

So you *are* doing what the current code does (I haven't reviewed the code 
yet). Your comment made it sound like you were just waiting longer to delete 
the log.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226710526
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

Sorry, I don't understand your comment. Can you elaborate please?

The problem that you described was the old cleaner logic may delete (or try 
to delete) a log for an active app. With the solution that I added, I am 
tracking the fileLength and lastProcessedTime. If the fileLength is updated, I 
do not delete the file and update the lastProcessedTime instead. And I rely on 
lastProcessedTime to determine whether it is time to delete the file.

Please let me know if there is an issue in the logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226706867
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

Isn't that just delaying the problem I described?

If you look at how this code handles that for event logs, it's way more 
complicated than that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226505024
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
--- End diff --

```
Utils.tryLogNonFatalError {
   // code goes here
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226504971
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
--- End diff --

That documentation is not user visible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226492916
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
--- End diff --

I provided the information that driverlog cleaner uses a fallback option in 
its own documentation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226492665
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
--- End diff --

I did not understand this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226492521
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

Thanks for this. I did not realize that `modtime` is not updated always. 
Added a fix. The fix will clean the logs when they are older than max(2 * 
cleaner_interval, max_age).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226472013
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not persist driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
--- End diff --

See my previous comment for why this call is not right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226464199
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, File, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private var rootHdfsDir : File = _
--- End diff --

s/hdfs/dfs in all this file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226466182
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
--- End diff --

Spark History Server


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226464800
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
--- End diff --

I'd really be more comfortable with using a temp dir rather than a 
hardcoded directory name...

Or at least use `__driver_logs__` since that syntax conventionally means 
things that user applications should not use.

And use a constant so you don't have to hardcode the value in the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226468029
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, File, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private var rootHdfsDir : File = _
+
+  override def beforeAll(): Unit = {
+rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs")
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(rootHdfsDir)
+  }
+
+  test("driver logs are persisted locally and synced to hdfs") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// File is continuously synced to Hdfs (which is a local dir for this 
test)
+assert(!driverLogsDir.exists())
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
+assert(hdfsFiles.length > 0)
+val driverLogFile = hdfsFiles.filter(f => 
f.getName.equals("driver.log")).head
+val hdfsIS = new BufferedInputStream(new 
FileInputStream(driverLogFile))
+assert(hdfsIS.available() > 0)
+JavaUtils.deleteRecursively(hdfsDir)
--- End diff --

This line and the next are unnecessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226461965
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
--- End diff --

Use the nicer closure syntax.

```
blah {
  do stuff
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226464013
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not persist driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to persist driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(loca

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226462792
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFi

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226468219
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, File, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private var rootHdfsDir : File = _
+
+  override def beforeAll(): Unit = {
+rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs")
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(rootHdfsDir)
+  }
+
+  test("driver logs are persisted locally and synced to hdfs") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// File is continuously synced to Hdfs (which is a local dir for this 
test)
--- End diff --

Comment is unrelated to any of the code around it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226460844
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
--- End diff --

This is not what I suggested and your way is actually very confusing for 
the user; especially since you say this is the default in the configuration, 
whereas in 99.9% of the cases it won't be, since the user will have at least 
one appender in the log4j conf.

If you want this config to have a default value, then you have to use it; 
it should have higher priority than the first appender's layout.

What I actually suggested was for this config to *not* have a default. That 
solves the confusion; because if you set it, it's used, otherwise, it defaults 
to the first appender's layout (or the default log4j layout).

You could probably do that with `createWithDefaultFunction`, but 
`createOptional` + some code here also works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226465465
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
--- End diff --

Spark


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226456519
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,6 +118,13 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // Visible for testing
   private[history] val fs: FileSystem = new 
Path(logDir).getFileSystem(hadoopConf)
 
+  private val driverLogFs: Option[FileSystem] =
+if (conf.get(DRIVER_LOG_DFS_DIR).isDefined) {
+  Some(FileSystem.get(hadoopConf))
--- End diff --

This is not right. It assumes the directory defined is in the defaultFS. 
See call right above this one for what you should do here.

I also don't see a good reason to keep this in a field. Just get the FS 
reference when the log cleaner runs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226456102
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -2421,11 +2425,13 @@ class SparkContext(config: SparkConf) extends 
Logging {
 // the cluster manager to get an application ID (in case the cluster 
manager provides one).
 listenerBus.post(SparkListenerApplicationStart(appName, 
Some(applicationId),
   startTime, sparkUser, applicationAttemptId, 
schedulerBackend.getDriverLogUrls))
+_driverLogger.foreach(_.startSync(_hadoopConfiguration))
   }
 
   /** Post the application end event */
   private def postApplicationEnd() {
 listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+_driverLogger.foreach(_.stop())
--- End diff --

I think it may be better call this in `stop()`. Stopping that logger is not 
part of posting an event to the listener bus.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226458986
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
+  val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(driverLogFs.get, appDirStatus.getPath())
--- End diff --

How are you differentiating logs that are being actively written to? 
There's this comment at the top of this class:

"some filesystems do not appear to update the `modtime` value whenever data 
is flushed to an open file output stream"

That was added because event logs actively being written to don't have 
their mtime updated. There's a lot of code in this class to deal with that.

In that situation you may run into issues here; either deleting a log for 
an active app, which would then cause lots of errors in that app's logs, or 
spamming the SHS logs with errors that this log cannot be deleted (not sure 
what's HDFS's behavior, but I believe it's the former).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226462505
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
--- End diff --

"spark dfs"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226467039
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
--- End diff --

spark.driver.log.syncToDfs.enabled  does not exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226468828
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, File, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private var rootHdfsDir : File = _
+
+  override def beforeAll(): Unit = {
+rootHdfsDir = Utils.createTempDir(namePrefix = "hdfs_logs")
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(rootHdfsDir)
+  }
+
+  test("driver logs are persisted locally and synced to hdfs") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// File is continuously synced to Hdfs (which is a local dir for this 
test)
+assert(!driverLogsDir.exists())
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
--- End diff --

All the checks below are basically:

```
hdfsFiles.exists { f => f.getName() == DRIVER_LOG_FILE && f.length() > 0 }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226457291
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
+conf.get(MAX_DRIVER_LOG_AGE_S) * 1000
--- End diff --

Fits in previous line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226466609
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
--- End diff --

Put config names between ``.

This is also wrong, since you do not have an explanation of this option in 
the documentation. It will be true also if you just enable the normal SHS 
cleaner. You should explain that option explicitly and say its default value is 
the value of the other config.

The SHS options are in monitoring.md.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226463571
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not persist driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+  with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to persist driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(loca

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226457468
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR)
--- End diff --

If you're calling this you know the config is set. So you can avoid the 
extra indentation by just calling `.get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226463678
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
+They are cleaned if they are older than max age configured at 
spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.syncToDfs.enabled
+  false
+  
+If true, spark application running in client mode will sync driver 
logs to a persistent storage, configured
--- End diff --

Makes sense. Updated the config name and the wording.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226463522
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogF

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226454499
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFile

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226453812
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
+They are cleaned if they are older than max age configured at 
spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.syncToDfs.enabled
+  false
+  
+If true, spark application running in client mode will sync driver 
logs to a persistent storage, configured
--- End diff --

I guess I just don't like using the word "sync" here, it makes me think the 
logs are getting stored somewhere, just not synced to persistent storage, even 
if this is false.

How about renaming the conf to "spark.driver.log.persistToDfs.enabled" and 
rewording this to

If true, spark application running in client mode will write driver logs to 
a persistent storage, configured in spark.driver.log.dfsDir.  If 
spark.driver.log.dfsDir is not configured, driver logs will not be stored to 
persistent storage.  Additionally, enable the cleaner by setting 
spark.history.fs.driverlog.cleaner.enabled to true.

?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226421974
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogF

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226421401
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
+They are cleaned if they are older than max age configured at 
spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.syncToDfs.enabled
+  false
+  
+If true, spark application running in client mode will sync driver 
logs to a persistent storage, configured
--- End diff --

I guess, what I meant to say here is that this feature enables syncing to 
dfs. We do not support syncing to local disk and that is just a implementation 
detail, not a feature.

I am good with using any alternate wording, if that will represent the 
intent better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226418231
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogF

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226395358
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFile

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226400385
  
--- Diff: docs/configuration.md ---
@@ -266,6 +266,37 @@ of the most common options to set are:
 Only has effect in Spark standalone mode or Mesos cluster deploy mode.
   
 
+
+  spark.driver.log.dfsDir
+  (none)
+  
+Base directory in which Spark driver logs are synced, if 
spark.driver.log.syncToDfs.enabled is true.
+Within this base directory, Spark creates a sub-directory for each 
application, and logs the driver logs
+specific to the application in this directory. Users may want to set 
this to a unified location like an
+HDFS directory so driver log files can be persisted for later usage. 
This directory should allow any spark
+user to read/write files and the spark history server user to delete 
files. Additionally, older logs from
+this directory are cleaned by Spark History Server if 
spark.history.fs.driverlog.cleaner.enabled is true.
+They are cleaned if they are older than max age configured at 
spark.history.fs.driverlog.cleaner.maxAge.
+  
+
+
+  spark.driver.log.syncToDfs.enabled
+  false
+  
+If true, spark application running in client mode will sync driver 
logs to a persistent storage, configured
--- End diff --

it seems like there is a mismatch between what is described here and what 
is implemented.  Do you intend to support a configuration where you are only 
logging to local disk, and not syncing to dfs?  It doesn't seem like 
`DriverLogger.apply` allows that.  And if you do intend to support that, and 
want it controlled by this configuration, then I'd remove the "syncToDfs" from 
this name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226394446
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFile

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226391941
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  val fa = 
LogManager.getRootLogger.getAppender(DriverLogger.APPENDER_NAME)
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  Utils.tryLogNonFatalError(fa.close())
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
--- End diff --

nit: double indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226396957
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs"
--- End diff --

use a relative dir, not an absolute one, for testing data.  (what if a 
developer happened to have real data at /tmp/hdfs_logs before running this?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226397391
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs"
+
+  override def beforeAll(): Unit = {
+FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  test("driver logs are persisted locally and synced to hdfs") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// On application end, file is moved to Hdfs (which is a local dir for 
this test)
--- End diff --

comment is a bit misleading (or maybe out of date) -- its synced 
continuously, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r226395070
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.{AccessControlException, 
UserGroupInformation}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf)
+extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var hdfsLogFile: String = _
+  private var writer: Option[HdfsAsyncWriter] = None
+  private var hadoopConfiguration: Configuration = _
+  private var fileSystem: FileSystem = _
+
+  private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS)
+  private val syncToYarn: Boolean = conf.get(DRIVER_LOG_SYNCTOYARN)
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  hdfsLogFile = {
+val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key,
+  "/tmp/driver_logs").split(",").head
+FileUtils.getFile(rootDir, appId, 
DRIVER_LOG_FILE).getAbsolutePath()
+  }
+  hadoopConfiguration = hadoopConf
+  fileSystem = FileSystem.get(hadoopConf)
+
+  // Setup a writer which moves the local file to hdfs continuously
+  if (syncToDfs) {
+writer = Some(new HdfsAsyncWriter())
+logInfo(s"The local driver log file is being synced to spark dfs 
at: ${hdfsLogFile}")
+  }
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  writer.map(_.closeWriter())
+  if (syncToYarn) {
+moveToYarnAppDir()
+  }
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  try {
+
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
+  } catch {
+case e: Exception =>
+  logError(s"Error in deleting local driver log dir", e)
+  }
+}
+  }
+
+  private def moveToYarnAppDir(): Unit = {
+try {
+  val appId = Utils.sanitizeDirName(conf.getAppId)
   

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225343405
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs"
+
+  override def beforeAll(): Unit = {
+FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  test("driver logs are persisted") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// On application end, file is moved to Hdfs (which is a local dir for 
this test)
+assert(!driverLogsDir.exists())
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
+assert(hdfsFiles.length > 0)
+JavaUtils.deleteRecursively(hdfsDir)
+assert(!hdfsDir.exists())
+  }
+
+  test("driver logs are synced to hdfs continuously") {
+val sc = getSparkContext()
--- End diff --

I changed the test to just check the final output, so it does not rely on 
loop logging anymore. That way, I don't need to expose 
`SparkContext.driverLogger` as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225310694
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,6 +117,13 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // Visible for testing
   private[history] val fs: FileSystem = new 
Path(logDir).getFileSystem(hadoopConf)
 
+  private[history] val driverLogFs: Option[FileSystem] =
--- End diff --

Just `private`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225312559
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  // Visible for testing
+  private[spark] var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head
--- End diff --

Why are you calling `split` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225314910
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs"
+
+  override def beforeAll(): Unit = {
+FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  test("driver logs are persisted") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// On application end, file is moved to Hdfs (which is a local dir for 
this test)
+assert(!driverLogsDir.exists())
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
+assert(hdfsFiles.length > 0)
+JavaUtils.deleteRecursively(hdfsDir)
+assert(!hdfsDir.exists())
+  }
+
+  test("driver logs are synced to hdfs continuously") {
+val sc = getSparkContext()
--- End diff --

I'm not a fan of what you did, exposing `SparkContext.driverLogger` for 
tests.

It should be possible to write this test without using `SparkContext` at 
all. Just create a `DriverLogger` and control it from the test.

Also, you did not address all of my previous feedback. e.g., the loop 
logging a bunch of things should not be needed. You should have some explicit 
action that causes the flush instead of relying on side effects of other calls.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225311036
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -281,6 +288,16 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 pool.scheduleWithFixedDelay(
   getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, 
TimeUnit.SECONDS)
   }
+
+  driverLogFs.foreach { _ =>
+if (conf.get(DRIVER_LOG_CLEANER_ENABLED).getOrElse(
+conf.getBoolean("spark.history.fs.cleaner.enabled", false))) {
--- End diff --

Turn `spark.history.fs.cleaner.enabled` into a config constant and use 
`fallbackConf`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225315220
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{BufferedInputStream, FileInputStream}
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
+
+  private val DRIVER_LOG_DIR_DEFAULT = "/tmp/hdfs_logs"
+
+  override def beforeAll(): Unit = {
+FileUtils.forceMkdir(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  override def afterAll(): Unit = {
+JavaUtils.deleteRecursively(FileUtils.getFile(DRIVER_LOG_DIR_DEFAULT))
+  }
+
+  test("driver logs are persisted") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+
+sc.stop()
+// On application end, file is moved to Hdfs (which is a local dir for 
this test)
+assert(!driverLogsDir.exists())
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
+assert(hdfsFiles.length > 0)
+JavaUtils.deleteRecursively(hdfsDir)
+assert(!hdfsDir.exists())
+  }
+
+  test("driver logs are synced to hdfs continuously") {
+val sc = getSparkContext()
+
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, "driver_logs")
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals("driver.log"))
+for (i <- 1 to 1000) {
+  logInfo("Log enough data to log file so that it can be flushed")
+}
+
+// Sync the driver logs manually instead of waiting for scheduler
+sc._driverLogger.foreach(_.writer.foreach(_.run()))
+val hdfsDir = 
FileUtils.getFile(sc.getConf.get(DRIVER_LOG_DFS_DIR).get, app_id)
+assert(hdfsDir.exists())
+val hdfsFiles = hdfsDir.listFiles()
+assert(hdfsFiles.length > 0)
+val driverLogFile = hdfsFiles.filter(f => 
f.getName.equals("driver.log")).head
+val hdfsIS = new BufferedInputStream(new 
FileInputStream(driverLogFile))
+assert(hdfsIS.available() > 0)
+
+sc.stop()
+// Ensure that the local file is deleted on application end
+assert(!driverLogsDir.exists())
+JavaUtils.deleteRecursively(hdfsDir)
+assert(!hdfsDir.exists())
+  }
+
+  private def getSparkContext(): SparkContext = {
+val conf = new SparkConf()
+conf.set("spark.local.dir", "/tmp")
--- End diff --

Don't do this. Tests run with a pre-configured temp directory, and should 
not write to `/tmp`.

If you need to create a temp directory, do so explicitly using the existing 
`Utils` API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.a

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225312422
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  // Visible for testing
+  private[spark] var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
--- End diff --

Shouldn't you close the appender you added, so the underlying file is 
closed / flushed too?

Or is that done implicitly by this call somehow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225310830
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,6 +117,13 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // Visible for testing
   private[history] val fs: FileSystem = new 
Path(logDir).getFileSystem(hadoopConf)
 
+  private[history] val driverLogFs: Option[FileSystem] =
+if (conf.getOption("spark.driver.log.dfsDir").isDefined) {
--- End diff --

`spark.driver.log.dfsDir` is a config constant. Use the constant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225311476
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.getOption("spark.driver.log.dfsDir")
+driverLogDir.foreach { dl =>
+  val maxTime = clock.getTimeMillis() -
--- End diff --

Create a config constant for the existing config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225313815
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  // Visible for testing
+  private[spark] var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFile))
+  outputStream = fileSystem.create(new Path(dfsLogFile), true)
+  

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225311218
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -281,6 +288,16 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 pool.scheduleWithFixedDelay(
   getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, 
TimeUnit.SECONDS)
   }
+
+  driverLogFs.foreach { _ =>
+if (conf.get(DRIVER_LOG_CLEANER_ENABLED).getOrElse(
+conf.getBoolean("spark.history.fs.cleaner.enabled", false))) {
+  pool.scheduleWithFixedDelay(getRunner(() => cleanDriverLogs()),
+0,
+
conf.get(DRIVER_LOG_CLEANER_INTERVAL).getOrElse(CLEAN_INTERVAL_S),
--- End diff --

Create config constant for existing config, use `fallbackConf`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225313921
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  // Visible for testing
+  private[spark] var writer: Option[DfsAsyncWriter] = None
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (conf.contains(DRIVER_LOG_LAYOUT)) {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+} else if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(DRIVER_LOG_LAYOUT.defaultValueString)
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  val appId = Utils.sanitizeDirName(conf.getAppId)
+  writer = Some(new DfsAsyncWriter(appId, hadoopConf))
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  LogManager.getRootLogger().removeAppender(DriverLogger.APPENDER_NAME)
+  writer.foreach(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  Utils.tryLogNonFatalError(JavaUtils.deleteRecursively(
+FileUtils.getFile(localLogFile).getParentFile()))
+}
+  }
+
+  // Visible for testing
+  private[spark] class DfsAsyncWriter(appId: String, hadoopConf: 
Configuration) extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val dfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR).get.split(",").head
+  if (!fileSystem.exists(new Path(rootDir))) {
+throw new RuntimeException(s"${rootDir} does not exist." +
+  s" Please create this dir in order to sync driver logs")
+  }
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var inStream: InputStream = null
+private var outputStream: FSDataOutputStream = null
+try {
+  inStream = new BufferedInputStream(new FileInputStream(localLogFile))
+  outputStream = fileSystem.create(new Path(dfsLogFile), true)
+  

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r225311355
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 stale.foreach { log =>
   if (log.appId.isEmpty) {
 logInfo(s"Deleting invalid / corrupt event log ${log.logPath}")
-deleteLog(new Path(log.logPath))
+deleteLog(fs, new Path(log.logPath))
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
   }
 
+  /**
+   * Delete driver logs from the configured spark dfs dir that exceed the 
configured max age
+   */
+  private[history] def cleanDriverLogs(): Unit = Utils.tryLog {
+val driverLogDir = conf.getOption("spark.driver.log.dfsDir")
--- End diff --

Use the config constant...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-10 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r224189470
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(appDirStatus.getPath())
--- End diff --

Added the new configurations, with fallback option to existing ones.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223902134
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[HdfsAsyncWriter] = None
+  private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS)
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  if (syncToDfs) {
+val appId = Utils.sanitizeDirName(conf.getAppId)
+writer = Some(new HdfsAsyncWriter(appId, hadoopConf))
+  }
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  writer.map(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  try {
+
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
+  } catch {
+case e: Exception =>
+  logError(s"Error in deleting local driver log dir", e)
+  }
+}
+  }
+
+  private class HdfsAsyncWriter(appId: String, hadoopConf: Configuration) 
extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val inStream = new BufferedInputStream(new 
FileInputStream(localLogFile))
+private var hdfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key, 
"/tmp/driver_logs").split(",").head
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val outputStream: FSDataOutputStream = fileSystem.create(new 
Path(hdfsLogFile), true)
+fileSystem.setPermission(new Path(hdfsLogFile), LOG_FILE_PERMISSIONS)
+private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE)
+private val threadpool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread")
+threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, 
UP

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223902004
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(appDirStatus.getPath())
--- End diff --

It's pretty trivial to create a different task to run this code you're 
adding...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223901876
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
+Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
+  private var writer: Option[HdfsAsyncWriter] = None
+  private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS)
+
+  addLogAppender()
+
+  private def addLogAppender(): Unit = {
+val appenders = LogManager.getRootLogger().getAllAppenders()
+val layout = if (appenders.hasMoreElements()) {
+  appenders.nextElement().asInstanceOf[Appender].getLayout()
+} else {
+  new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
+}
+val fa = new Log4jFileAppender(layout, localLogFile)
+fa.setName(DriverLogger.APPENDER_NAME)
+LogManager.getRootLogger().addAppender(fa)
+logInfo(s"Added a local log appender at: ${localLogFile}")
+  }
+
+  def startSync(hadoopConf: Configuration): Unit = {
+try {
+  // Setup a writer which moves the local file to hdfs continuously
+  if (syncToDfs) {
+val appId = Utils.sanitizeDirName(conf.getAppId)
+writer = Some(new HdfsAsyncWriter(appId, hadoopConf))
+  }
+} catch {
+  case e: Exception =>
+logError(s"Could not sync driver logs to spark dfs", e)
+}
+  }
+
+  def stop(): Unit = {
+try {
+  writer.map(_.closeWriter())
+} catch {
+  case e: Exception =>
+logError(s"Error in persisting driver logs", e)
+} finally {
+  try {
+
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
+  } catch {
+case e: Exception =>
+  logError(s"Error in deleting local driver log dir", e)
+  }
+}
+  }
+
+  private class HdfsAsyncWriter(appId: String, hadoopConf: Configuration) 
extends Runnable
+with Logging {
+
+private var streamClosed = false
+private val inStream = new BufferedInputStream(new 
FileInputStream(localLogFile))
+private var hdfsLogFile: String = {
+  val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key, 
"/tmp/driver_logs").split(",").head
+  FileUtils.getFile(rootDir, appId, DRIVER_LOG_FILE).getAbsolutePath()
+}
+private var fileSystem: FileSystem = FileSystem.get(hadoopConf)
+private val outputStream: FSDataOutputStream = fileSystem.create(new 
Path(hdfsLogFile), true)
+fileSystem.setPermission(new Path(hdfsLogFile), LOG_FILE_PERMISSIONS)
+private val tmpBuffer = new Array[Byte](UPLOAD_CHUNK_SIZE)
+private val threadpool = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dfsSyncThread")
+threadpool.scheduleWithFixedDelay(this, UPLOAD_INTERVAL_IN_SECS, 

[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223901520
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(appDirStatus.getPath())
--- End diff --

I added the documentation but I am still relying on the existing history 
server configurations. This is because I am using an existing cleaner. 
Otherwise, I may have to create another cleaner for this, which can run at a 
different interval and has a different configured max age.

Please let me know if I should create another cleaner for this or just 
provide a configuration for a different max age.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223900576
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
--- End diff --

Could be. If it doesn't work this is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223900426
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
--- End diff --

Didn't work. Maybe because it is a RemoteIterator?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223871565
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io._
+import java.util.concurrent.TimeUnit
+
+import scala.util.{Failure, Try}
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class DriverLogger(conf: SparkConf) extends Logging {
+
+  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
+  private val UPLOAD_INTERVAL_IN_SECS = 5
+  private val DRIVER_LOG_FILE = "driver.log"
+  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+
+  private var localLogFile: String = FileUtils.getFile(
--- End diff --

Utils.getLocalDir also does that actually. Spark deletes the directory that 
it creates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223866624
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -48,6 +48,19 @@ package object config {
 .bytesConf(ByteUnit.MiB)
 .createOptional
 
+  private[spark] val DRIVER_LOG_DFS_DIR =
+ConfigBuilder("spark.driver.log.dfsDir").stringConf.createOptional
+
+  private[spark] val DRIVER_LOG_LAYOUT =
+ConfigBuilder("spark.driver.log.layout")
+  .stringConf
+  .createWithDefault("%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n")
+
+  private[spark] val DRIVER_LOG_SYNCTODFS =
--- End diff --

Let's leave it. But there's some usage of the enabled flag in your code 
that you can still clean up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223866023
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -48,6 +48,19 @@ package object config {
 .bytesConf(ByteUnit.MiB)
 .createOptional
 
+  private[spark] val DRIVER_LOG_DFS_DIR =
+ConfigBuilder("spark.driver.log.dfsDir").stringConf.createOptional
+
+  private[spark] val DRIVER_LOG_LAYOUT =
+ConfigBuilder("spark.driver.log.layout")
+  .stringConf
+  .createWithDefault("%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n")
+
+  private[spark] val DRIVER_LOG_SYNCTODFS =
--- End diff --

Yes, as you said I prefer to keep the functionality of disabling/enabling 
separate from log dir. But I do see that removing this reduces the number of 
configurations.

I am ok either ways. Please let me know if I should remove this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223864050
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(appDirStatus.getPath())
+}
+  }
+} catch {
+  case nse: NoSuchElementException => // no-op
--- End diff --

It's an optional config. If you use the config constant this will go away. 
(And even in the legacy case you should use `conf.getOption` in those cases.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22504: [SPARK-25118][Submit] Persist Driver Logs in Clie...

2018-10-09 Thread ankuriitg
Github user ankuriitg commented on a diff in the pull request:

https://github.com/apache/spark/pull/22504#discussion_r223859868
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +806,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 // Clean the blacklist from the expired entries.
 clearBlacklist(CLEAN_INTERVAL_S)
+
+// Delete driver logs from the configured spark dfs dir that exceed 
the configured max age
+try {
+  val hdfsDir = conf.get("spark.driver.log.dfsDir")
+  val appDirs = fs.listLocatedStatus(new Path(hdfsDir))
+  while (appDirs.hasNext()) {
+val appDirStatus = appDirs.next()
+if (appDirStatus.getModificationTime() < maxTime) {
+  logInfo(s"Deleting expired driver log for: 
${appDirStatus.getPath().getName()}")
+  deleteLog(appDirStatus.getPath())
+}
+  }
+} catch {
+  case nse: NoSuchElementException => // no-op
--- End diff --

conf.get("spark.driver.log.dfsDir")


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org