Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r223766597 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( + jvmVmemTotal: Long, + jvmRSSTotal: Long, + pythonVmemTotal: Long, + pythonRSSTotal: Long, + otherVmemTotal: Long, + otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { + if (testing) { + return true + } + try { + if (!Files.exists(Paths.get(procfsDir))) { + return false + } + } + catch { + case f: FileNotFoundException => return false + } + val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { + if (!isAvailable || testing) { + return -1; + } + try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; + } + catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) + isAvailable = false + return -1 + } + } + + private def computePageSize(): Long = { + if (testing) { + return 0; + } + val cmd = Array("getconf", "PAGESIZE") + val out2 = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out2.split("\n")(0)) + } + + private def computeProcessTree(): Unit = { + if (!isAvailable || testing) { + return + } + val queue = mutable.Queue.empty[Int] + queue += pid + while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { + queue ++= c + ptree += (p -> c.toSet) + } + else { + ptree += (p -> Set[Int]()) + } + } + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { + try { + val cmd = Array("pgrep", "-P", pid.toString) + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = builder.start() + val output = new StringBuilder() + val threadName = "read stdout for " + "pgrep" + def appendToOutput(s: String): Unit = output.append(s).append("\n") + val stdoutThread = Utils.processStreamByLine(threadName, + process.getInputStream, appendToOutput) + val exitCode = process.waitFor() + stdoutThread.join() + // pgrep will have exit code of 1 if there are more than one child process + // and it will have a exit code of 2 if there is no child process + if (exitCode != 0 && exitCode > 2) { + logError(s"Process $cmd exited with code $exitCode: $output") + throw new SparkException(s"Process $cmd exited with code $exitCode") + } + val childPids = output.toString.split("\n") + val childPidsInInt = mutable.ArrayBuffer.empty[Int] + for (p <- childPids) { + if (p != "") { + logDebug("Found a child pid: " + p) + childPidsInInt += Integer.parseInt(p) + } + } + childPidsInInt + } catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) + isAvailable = false + return mutable.ArrayBuffer.empty[Int] + } + } + + def computeProcessInfo(pid: Int): Unit = { + /* + * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory + * info. I tried that but found it not correct during tests, so I used normal string analysis + * instead. The computation of RSS and Vmem are based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + try { + val pidDir = new File(procfsDir, pid.toString) + Utils.tryWithResource( new InputStreamReader( + new FileInputStream( + new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))) { fReader => + Utils.tryWithResource( new BufferedReader(fReader)) { in => + val procInfo = in.readLine + val procInfoSplit = procInfo.split(" ") + if (procInfoSplit != null) { + val vmem = procInfoSplit(22).toLong + val rssPages = procInfoSplit(23).toLong + if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { --- End diff -- We don't have much pyspark ourselves, but yes, it seems useful to have the breakdown, and it's easy to sum the values for the total.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org