Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22504#discussion_r220016445
  
    --- Diff: 
core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.logging
    +
    +import java.io._
    +import java.util.concurrent.TimeUnit
    +
    +import scala.util.{Failure, Try}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.hadoop.security.{AccessControlException, 
UserGroupInformation}
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +import org.apache.log4j.{FileAppender => Log4jFileAppender, _}
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.util.{ThreadUtils, Utils}
    +
    +private[spark] class DriverLogger(conf: SparkConf)
    +    extends Logging {
    +
    +  private val UPLOAD_CHUNK_SIZE = 1024 * 1024
    +  private val UPLOAD_INTERVAL_IN_SECS = 5
    +  private val DRIVER_LOG_FILE = "driver.log"
    +  private val LOG_FILE_PERMISSIONS = new 
FsPermission(Integer.parseInt("770", 8).toShort)
    +
    +  private var localLogFile: String = FileUtils.getFile(
    +    Utils.getLocalDir(conf), "driver_logs", 
DRIVER_LOG_FILE).getAbsolutePath()
    +  private var hdfsLogFile: String = _
    +  private var writer: Option[HdfsAsyncWriter] = None
    +  private var hadoopConfiguration: Configuration = _
    +  private var fileSystem: FileSystem = _
    +
    +  private val syncToDfs: Boolean = conf.get(DRIVER_LOG_SYNCTODFS)
    +  private val syncToYarn: Boolean = conf.get(DRIVER_LOG_SYNCTOYARN)
    +
    +  addLogAppender()
    +
    +  private def addLogAppender(): Unit = {
    +    val appenders = LogManager.getRootLogger().getAllAppenders()
    +    val layout = if (appenders.hasMoreElements()) {
    +      appenders.nextElement().asInstanceOf[Appender].getLayout()
    +    } else {
    +      new PatternLayout(conf.get(DRIVER_LOG_LAYOUT))
    +    }
    +    val fa = new Log4jFileAppender(layout, localLogFile)
    +    fa.setName(DriverLogger.APPENDER_NAME)
    +    LogManager.getRootLogger().addAppender(fa)
    +    logInfo(s"Added a local log appender at: ${localLogFile}")
    +  }
    +
    +  def startSync(hadoopConf: Configuration): Unit = {
    +    try {
    +      val appId = Utils.sanitizeDirName(conf.getAppId)
    +      hdfsLogFile = {
    +        val rootDir = conf.get(DRIVER_LOG_DFS_DIR.key,
    +          "/tmp/driver_logs").split(",").head
    +        FileUtils.getFile(rootDir, appId, 
DRIVER_LOG_FILE).getAbsolutePath()
    +      }
    +      hadoopConfiguration = hadoopConf
    +      fileSystem = FileSystem.get(hadoopConf)
    +
    +      // Setup a writer which moves the local file to hdfs continuously
    +      if (syncToDfs) {
    +        writer = Some(new HdfsAsyncWriter())
    +        logInfo(s"The local driver log file is being synced to spark dfs 
at: ${hdfsLogFile}")
    +      }
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Could not sync driver logs to spark dfs", e)
    +    }
    +  }
    +
    +  def stop(): Unit = {
    +    try {
    +      writer.map(_.closeWriter())
    +      if (syncToYarn) {
    +        moveToYarnAppDir()
    +      }
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error in persisting driver logs", e)
    +    } finally {
    +      try {
    +        
JavaUtils.deleteRecursively(FileUtils.getFile(localLogFile).getParentFile())
    +      } catch {
    +        case e: Exception =>
    +          logError(s"Error in deleting local driver log dir", e)
    +      }
    +    }
    +  }
    +
    +  private def moveToYarnAppDir(): Unit = {
    +    try {
    +      val appId = Utils.sanitizeDirName(conf.getAppId)
    +      val yarnConf = new YarnConfiguration(hadoopConfiguration)
    +      val rootDir = yarnConf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)
    +      if (rootDir != null && !rootDir.isEmpty()) {
    +        var parentDir = if (UserGroupInformation.getCurrentUser() != null) 
{
    +          FileUtils.getFile(
    --- End diff --
    
    Will this parent dir already exist at the time this code runs?
    
    I'd be more comfortable with asserting that it does exist, in which case it 
would have been created by YARN itself, than to automatically create it by 
calling one of the `FileSystem` methods later in this method...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to