Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22612#discussion_r235124501
  
    --- Diff: 
core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala ---
    @@ -0,0 +1,228 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.executor
    +
    +import java.io._
    +import java.nio.charset.Charset
    +import java.nio.file.{Files, Paths}
    +import java.util.Locale
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{SparkEnv, SparkException}
    +import org.apache.spark.internal.{config, Logging}
    +import org.apache.spark.util.Utils
    +
    +private[spark] case class ProcfsBasedSystemsMetrics(
    +    jvmVmemTotal: Long,
    +    jvmRSSTotal: Long,
    +    pythonVmemTotal: Long,
    +    pythonRSSTotal: Long,
    +    otherVmemTotal: Long,
    +    otherRSSTotal: Long)
    +
    +// Some of the ideas here are taken from the ProcfsBasedProcessTree class 
in hadoop
    +// project.
    +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") 
extends Logging {
    +  val procfsStatFile = "stat"
    +  val testing = sys.env.contains("SPARK_TESTING") || 
sys.props.contains("spark.testing")
    +  var pageSize = computePageSize()
    +  var isAvailable: Boolean = isProcfsAvailable
    +  private val pid = computePid()
    +
    +  // var allMetrics: ProcfsBasedSystemsMetrics = 
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
    +
    +  computeProcessTree()
    +
    +  private lazy val isProcfsAvailable: Boolean = {
    +    if (testing) {
    +       true
    +    }
    +    else {
    +      var procDirExists = true
    +      try {
    +        if (!Files.exists(Paths.get(procfsDir))) {
    +          procDirExists = false
    +        }
    +      }
    +      catch {
    +        case f: IOException =>
    +          logWarning("It seems that procfs isn't supported", f)
    +          procDirExists = false
    +      }
    +      val shouldLogStageExecutorMetrics =
    +        SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
    +      val shouldLogStageExecutorProcessTreeMetrics =
    +        SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
    +      procDirExists && shouldLogStageExecutorProcessTreeMetrics && 
shouldLogStageExecutorMetrics
    +    }
    +  }
    +
    +  private def computePid(): Int = {
    +    if (!isAvailable || testing) {
    +      return -1;
    +    }
    +    try {
    +      // This can be simplified in java9:
    +      // 
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
    +      val cmd = Array("bash", "-c", "echo $PPID")
    +      val out2 = Utils.executeAndGetOutput(cmd)
    +      val pid = Integer.parseInt(out2.split("\n")(0))
    +      return pid;
    +    }
    +    catch {
    +      case e: SparkException =>
    +        logWarning("Exception when trying to compute process tree." +
    +        " As a result reporting of ProcessTree metrics is stopped", e)
    +        isAvailable = false
    +        return -1
    +    }
    +  }
    +
    +  private def computePageSize(): Long = {
    +    if (testing) {
    +      return 0;
    +    }
    +    try {
    +      val cmd = Array("getconf", "PAGESIZE")
    +      val out = Utils.executeAndGetOutput(cmd)
    +      return Integer.parseInt(out.split("\n")(0))
    +    } catch {
    +      case e: Exception => logWarning("Exception when trying to compute 
pagesize, as a" +
    +        " result reporting of ProcessTree metrics is stopped")
    +        isAvailable = false
    +        return 0
    +    }
    +  }
    +
    +  private def computeProcessTree(): Set[Int] = {
    +    if (!isAvailable || testing) {
    +      return Set()
    +    }
    +    var ptree: Set[Int] = Set()
    +    ptree += pid
    +    val queue = mutable.Queue.empty[Int]
    +    queue += pid
    +    while( !queue.isEmpty ) {
    +      val p = queue.dequeue()
    +      val c = getChildPids(p)
    +      if(!c.isEmpty) {
    +        queue ++= c
    +        ptree ++= c.toSet
    +      }
    +    }
    +    ptree
    +  }
    +
    +  private def getChildPids(pid: Int): ArrayBuffer[Int] = {
    +    try {
    +      // val cmd = Array("pgrep", "-P", pid.toString)
    +      val builder = new ProcessBuilder("pgrep", "-P", pid.toString)
    +      val process = builder.start()
    +      // val output = new StringBuilder()
    +      val threadName = "read stdout for " + "pgrep"
    +      val childPidsInInt = mutable.ArrayBuffer.empty[Int]
    +      def appendChildPid(s: String): Unit = {
    +        if (s != "") {
    +          logDebug("Found a child pid:" + s)
    +          childPidsInInt += Integer.parseInt(s)
    +        }
    +      }
    +      val stdoutThread = Utils.processStreamByLine(threadName,
    +        process.getInputStream, appendChildPid)
    +      val exitCode = process.waitFor()
    +      stdoutThread.join()
    +      // pgrep will have exit code of 1 if there are more than one child 
process
    +      // and it will have a exit code of 2 if there is no child process
    +      if (exitCode != 0 && exitCode > 2) {
    +        val cmd = builder.command().toArray.mkString(" ")
    +        logWarning(s"Process $cmd" +
    +          s" exited with code $exitCode, with stderr:" + 
s"${process.getErrorStream} ")
    +        throw new SparkException(s"Process $cmd exited with code 
$exitCode")
    +      }
    +      childPidsInInt
    +    } catch {
    +      case e: Exception => logWarning("Exception when trying to compute 
process tree." +
    +        " As a result reporting of ProcessTree metrics is stopped.", e)
    +        isAvailable = false
    +        return mutable.ArrayBuffer.empty[Int]
    +    }
    +  }
    +
    +  def computeProcessInfo(allMetrics: ProcfsBasedSystemsMetrics, pid: Int):
    +  ProcfsBasedSystemsMetrics = {
    +  /*
    +   * Hadoop ProcfsBasedProcessTree class used regex and pattern matching 
to retrive the memory
    +   * info. I tried that but found it not correct during tests, so I used 
normal string analysis
    +   * instead. The computation of RSS and Vmem are based on proc(5):
    +   * http://man7.org/linux/man-pages/man5/proc.5.html
    +   */
    --- End diff --
    
    I mean the comment should be indented the same as the rest of the body of 
method.  Eg the very first '/' needs to be indented to the same level as the 
'try' just after this comment.  And the rest of the comment should be indented 
like that slash.
    
    Alternatively, just prefix each line with '//'.  Same indentation rule, but 
indentation might look better, and I guess is actually what is done more in 
spark code ( eg. 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala#L47-L51)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to