[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r239185785 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends Logging { + private val procfsStatFile = "stat" + private val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + private val pageSize = computePageSize() + private var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; --- End diff -- super nit: no need for explicit return and to create `val pid`, just have the final statement be `Integer.parseInt(out.split("\n")(0))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238739905 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 4096; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while ( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if (!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = b
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238731915 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { --- End diff -- Oh ok. I didn't know that you don't need to mention val if you want a val parameter. Not related here but I think you can have a var parameter if you mention var in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238424342 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 4096; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while ( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if (!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = bui
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238388633 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { --- End diff -- no, not a var -- on a constructor argument, you don't need to put in either `val` or `var`. https://stackoverflow.com/questions/14694712/do-scala-constructor-parameters-default-to-private-val --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238380513 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 4096; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while ( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if (!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = b
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238377911 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable --- End diff -- the class is private[spark], but sure I will make them private to limit more --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238377498 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { --- End diff -- I think there were some review comments a few month ago to make it a val? So you think a var is better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238371374 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 4096; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while ( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if (!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = bui
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238363825 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable --- End diff -- I think these can all be private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238366634 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 4096; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while ( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if (!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = bui
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r238363142 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging { --- End diff -- `procfsDir` doesn't need to be a `val` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237993972 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pg
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237992560 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237990856 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pg
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237982013 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237975861 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -138,19 +138,22 @@ private[spark] class ProcfsMetricsGetter( } val stdoutThread = Utils.processStreamByLine("read stdout for pgrep", process.getInputStream, appendChildPid) - val error = process.getErrorStream - var errorString = "" - (0 until error.available()).foreach { i => -errorString += error.read() - } + val errorStringBuilder = new StringBuilder() + val stdErrThread = Utils.processStreamByLine( +"stderr for pgrep", +process.getErrorStream, +{ line => +errorStringBuilder.append(line) + }) --- End diff -- nit: if the line-handling closure is multiline, you should indent the body more. but I'd just put it on one line ```scala val stdErrThread = Utils.processStreamByLine( "stderr for pgrep", process.getErrorStream, line => errorStringBuilder.append(line)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237981353 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237978529 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { --- End diff -- I think its pretty weird having this constructor argument only used for tests. I'd either (a) always use this argument (and make the page size computed in the default constructor), or (b) don't make this a constructor argument at all, and just hardcode the value in `computePageSize` if testing (you only set it to one value during testing, we don't need it parameterizable more than that currently). (b) should be pretty easy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237977304 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -49,14 +47,14 @@ class ExecutorMetrics private[spark] extends Serializable { } /** - * Constructor: create the ExecutorMetrics with the values specified. + * Constructor: create the ExecutorMetrics with using a given map. * * @param executorMetrics map of executor metric name to value */ private[spark] def this(executorMetrics: Map[String, Long]) { this() -(0 until ExecutorMetricType.values.length).foreach { idx => - metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +ExecutorMetricType.metricToOffset.map { m => + metrics(m._2) = executorMetrics.getOrElse(m._1, 0L) --- End diff -- you can use pattern matching here. Also you're not returning anything from that loop, so `foreach` is more appropriate than `map`. ```scala .foreach { case(name, idx) => metrics(idx) = executorsMetrics.getOrElse(name, 0L) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237982177 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237979881 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { --- End diff -- nit: space after `if` and `while`, before the `(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional comman
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237737064 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237736770 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgrep
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r237161182 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pg
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236863378 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -28,16 +28,14 @@ import org.apache.spark.metrics.ExecutorMetricType @DeveloperApi class ExecutorMetrics private[spark] extends Serializable { - // Metrics are indexed by ExecutorMetricType.values - private val metrics = new Array[Long](ExecutorMetricType.values.length) - + private val metrics = new Array[Long](ExecutorMetricType.numMetrics) --- End diff -- I'd keep a comment here explaining this array, just update that its now indexed by `ExecutorMetricType.metricToOffset` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236856360 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236858045 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236856995 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236860940 --- Diff: core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.SparkFunSuite + + +class ProcfsMetricsGetterSuite extends SparkFunSuite { + + val p = new ProcfsMetricsGetter(getTestResourcePath("ProcessTree"), 4096L) --- End diff -- minor, can you rename this dir to "ProcfsMetrics"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236857465 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236855952 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236860104 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -95,10 +136,22 @@ private[spark] object ExecutorMetricType { OnHeapUnifiedMemory, OffHeapUnifiedMemory, DirectPoolMemory, -MappedPoolMemory +MappedPoolMemory, +ProcessTreeMetrics ) - // Map of executor metric type to its index in values. - val metricIdxMap = -Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: _*) + + val (metricToOffset, numMetrics) = { +var numberOfMetrics = 0 +val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int] +metricGetters.foreach { m => + var metricInSet = 0 + while (metricInSet < m.names.length) { +definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics)) +metricInSet += 1 + } --- End diff -- ```scala (0 until m.names.length).foreach { idx => definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236855451 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236848338 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236847493 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + + +private[spark] case class ProcfsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsMetricsGetter( +val procfsDir: String = "/proc/", +val pSizeForTest: Long = 0) extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + val pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover { +case ioe: IOException => + logWarning("Exception checking for procfs dir", ioe) + false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +-1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return pSizeForTest; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => +logWarning("Exception when trying to compute pagesize, as a" + + " result reporting of ProcessTree metrics is stopped") +isAvailable = false +0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c.toSet + } +} +ptree + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val builder = new ProcessBuilder("pgre
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236828402 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++=
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236769432 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( "java.nio:type=BufferPool,name=mapped") private[spark] object ExecutorMetricType { + final val pTreeInfo = new ProcfsBasedSystems --- End diff -- Normally having an object helps make it clear that there is a singleton; its easier to share properly and easier to figure out how to get a handle on it. Given that we'll have a class anyway, I don't think there is a ton of value in having there be a companion object. I do still think the instance you create here should go somewhere else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236766605 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() --- End diff -- You can't put it as a default value, but if you make it a static method, then you can provide an overloaded method which uses it, see https://github.com/squito/spark/commit/cf008355e8b9ce9faeab267cd0763a3859a5ccc9 But, I think your other proposal is even better, if its testing just give it a fixed value (no need to even make it an argument to the constructor at all). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r236432819 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( "java.nio:type=BufferPool,name=mapped") private[spark] object ExecutorMetricType { + final val pTreeInfo = new ProcfsBasedSystems --- End diff -- Today I spent sometime on the companion object solution and figured out the problem that I was facing before and was able to fix it. I will send the updated pr sometime tonight or tomorrow. thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r235542735 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() --- End diff -- I think I can't call computePageSize() in the constructor signature to compute the default value. Another solution is to check for testing inside computePageSize and if we are testing assign a value to it that is provided in the constructor (default to 4096). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r235541704 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( "java.nio:type=BufferPool,name=mapped") private[spark] object ExecutorMetricType { + final val pTreeInfo = new ProcfsBasedSystems --- End diff -- What are the benefits of the companion object vs this current approach? I can revert to the companion object model and do testing again to see what was the problem before, but just wanted to understand the benefits of it before investing time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r235124501 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234815376 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++=
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234340260 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234340168 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 --- End diff -- return isn't necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234344505 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234341965 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234344631 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,25 +19,43 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names: Seq[String] = Seq() --- End diff -- neither of these methods should have default implementations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234347780 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( "java.nio:type=BufferPool,name=mapped") private[spark] object ExecutorMetricType { + final val pTreeInfo = new ProcfsBasedSystems --- End diff -- this is a weird place to keep this, unless there is some really good reason for it. I think it should go inside `ProcessTreeMetrics`. also I'm not sure what the problem was with making it an object. Seems to work for me. its a bit different now as there are arguments to the constructor for testing -- but you could still have an object which extends the class ```scala private[spark] object ProcfsBasedSystems extends ProcfsBasedSystems("/proc/") ``` though doesn't really seem to have much value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234343451 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234342703 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234337409 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() --- End diff -- not needed anymroe --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234334930 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -28,16 +28,14 @@ import org.apache.spark.metrics.ExecutorMetricType @DeveloperApi class ExecutorMetrics private[spark] extends Serializable { - // Metrics are indexed by ExecutorMetricType.values - private val metrics = new Array[Long](ExecutorMetricType.values.length) - + private val metrics = new Array[Long](ExecutorMetricType.numMetrics) // the first element is initialized to -1, indicating that the values for the array // haven't been set yet. metrics(0) = -1 - /** Returns the value for the specified metricType. */ - def getMetricValue(metricType: ExecutorMetricType): Long = { -metrics(ExecutorMetricType.metricIdxMap(metricType)) + /** Returns the value for the specified metric. */ + def getMetricValue(metricName: String): Long = { +metrics(ExecutorMetricType.metricToOffset.get(metricName).get) --- End diff -- no point in `metrics.get(...).get()`. if its OK for this to throw an exception for a missing key, then just do `metrics(...)`, like before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234339484 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } --- End diff -- this can be simplified to ```scala val procDirExists = Try(Files.exists(Paths.get(procfsDir)).recover { case ioe: IOException => logWarning("Exception checking for procfs dir", f) false } ``` or maybe even ditch that warning msg completely ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234340136 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) --- End diff -- return isn't necessary here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234343988 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics(jvmVmemTotal: Long, + jvmRSSTotal: Long, + pythonVmemTotal: Long, + pythonRSSTotal: Long, + otherVmemTotal: Long, + otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems extends Logging { + var procfsDir = "/proc/" + val procfsStatFile = "stat" + var pageSize = 0 + var isAvailable: Boolean = isItProcfsBased + private val pid: Int = computePid() + private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = +scala.collection.mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal: Long = 0 + private var latestJVMRSSTotal: Long = 0 + private var latestPythonVmemTotal: Long = 0 + private var latestPythonRSSTotal: Long = 0 + private var latestOtherVmemTotal: Long = 0 + private var latestOtherRSSTotal: Long = 0 + + computeProcessTree() + + private def isItProcfsBased: Boolean = { +val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out: Array[Byte] = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; +} +catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped") +isAvailable = false +return -1 + case _ => logDebug("Some exception occurred when trying to compute process tree. " + +"As a result reporting of ProcessTree metrics is stopped") +isAvailable = false +return -1 +} + } + + private def computePageSize(): Unit = { +val cmd = Array("getconf", "PAGESIZE") +val out: Array[Byte] = Array.fill[Byte](10)(0) +Runtime.getRuntime.exec(cmd).getInputStream.read(out) +pageSize = Integer.parseInt(new String(out, "UTF-8").trim) + } + + privat
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234337238 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -69,9 +67,8 @@ class ExecutorMetrics private[spark] extends Serializable { */ private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { var updated = false - -(0 until ExecutorMetricType.values.length).foreach { idx => - if (executorMetrics.metrics(idx) > metrics(idx)) { +ExecutorMetricType.metricToOffset.map { case (_, idx) => --- End diff -- minor, but i think foreach on a range is both more clear and more efficient here. You could do `(0 until ExecutorMetricType.numMetrics).foreach` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234339856 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) --- End diff -- nit: indent the 2nd line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234344821 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,25 +19,43 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names: Seq[String] = Seq() +} + +sealed trait SingleValueExecutorMetricType extends ExecutorMetricType { + override private[spark] def names = Seq(getClass().getName(). +stripSuffix("$").split("""\.""").last) + + override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { +val metrics = new Array[Long](1) +metrics(0) = getMetricValue(memoryManager) +metrics + } + + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 --- End diff -- no default implementation here either --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234342892 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234338058 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() --- End diff -- `pageSize` is only a `var` for testing -- instead just optionally pass it in to the constructor also I think all of these can be `private`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234334582 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -60,11 +60,16 @@ private[spark] class Heartbeater( } /** - * Get the current executor level metrics. These are returned as an array, with the index - * determined by ExecutorMetricType.values + * Get the current executor level metrics. These are returned as an array */ def getCurrentMetrics(): ExecutorMetrics = { -val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray +val metrics = new Array[Long](ExecutorMetricType.numMetrics) +var offset = 0 +ExecutorMetricType.metricGetters.foreach { metric => + val newSetOfMetrics = metric.getMetricValues(memoryManager) --- End diff -- nit: lets avoid "set", since order matters, you can just use `newMetrics` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234339667 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) --- End diff -- can be `out` instead of `out2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234348523 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { --- End diff -- I don't love these names, though I also suck at coming up with good ones. I think in particular the "Systems" part is too ambiguous to be useful. how about `ProcfsBasedSystemsMetrics` -> `ProcfsMetrics` `ProcfsBasedSystems` -> `ProcfsMetricsGetter` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234342770 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234343203 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234344763 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,25 +19,43 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names: Seq[String] = Seq() +} + +sealed trait SingleValueExecutorMetricType extends ExecutorMetricType { + override private[spark] def names = Seq(getClass().getName(). +stripSuffix("$").split("""\.""").last) --- End diff -- nit: if the method def is multiline, enclose in braces, and the body should start on a newline. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234347011 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -47,18 +65,39 @@ private[spark] abstract class MBeanExecutorMetricType(mBeanName: String) } } -case object JVMHeapMemory extends ExecutorMetricType { +case object JVMHeapMemory extends SingleValueExecutorMetricType { override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed() } } -case object JVMOffHeapMemory extends ExecutorMetricType { +case object JVMOffHeapMemory extends SingleValueExecutorMetricType { override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed() } } +case object ProcessTreeMetrics extends ExecutorMetricType { + override val names = Seq( +"ProcessTreeJVMVMemory", +"ProcessTreeJVMRSSMemory", +"ProcessTreePythonVMemory", +"ProcessTreePythonRSSMemory", +"ProcessTreeOtherVMemory", +"ProcessTreeOtherRSSMemory") + override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { --- End diff -- nit: blank line before this method --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234340033 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") --- End diff -- nit: nothing else on line after `=>`, and extra indent for second line of log msg --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234337384 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) --- End diff -- delete --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r234342135 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + + // var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private lazy val isProcfsAvailable: Boolean = { +if (testing) { + true +} +else { + var procDirExists = true + try { +if (!Files.exists(Paths.get(procfsDir))) { + procDirExists = false +} + } + catch { +case f: IOException => + logWarning("It seems that procfs isn't supported", f) + procDirExists = false + } + val shouldLogStageExecutorMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) + val shouldLogStageExecutorProcessTreeMetrics = +SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) + procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics +} + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => +logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Set[Int] = { +if (!isAvailable || testing) { + return Set() +} +var ptree: Set[Int] = Set() +ptree += pid +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree ++= c
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r233255758 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,18 +19,31 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 + private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names = Seq(getClass().getName().stripSuffix("$").split("""\.""").last) --- End diff -- I think I'm suggesting something slightly different. There would be two traits, but they'd be related. `trait ExecutorMetricType` would allow multiple values, as you have it here. Everything which needed to interact with the metrics would use that interface. That would define a `getMetricsValues()` etc. (plural). Then you'd have another `trait SingleValueMetricType` (or something) which would have abstract methods like `getMetricValue()` singular, and it would provide a definition for the plural methods like ```scala def getMetricValues(): Array[Long] = { val metrics = new Array[Long](1) metrics(0) = getMetricValue() metrics } ``` in java terms, you could think of `ExecutorMetricType` as an interface and `SingleValueMetricType` as an abstract class. `SingleValueMetricType` is just a convenience for some implementations to take advantage of to avoid repeating code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r233254497 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { --- End diff -- I think I misunderstood the comment. I was thinking that this is referring to" isAvailable". I will change isProcfsAvailable to lazy val. Sorry about my confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r233252552 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,18 +19,31 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 + private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names = Seq(getClass().getName().stripSuffix("$").split("""\.""").last) --- End diff -- Thank you @squito for the comments. Better to finalize this before doing back and force. So what you suggest is that having two trait one for MetricTypes with one value (Lets call it SingleValueExecutorMetric) and one for MetricType with multiple Values (Lets call it MultiValueExecutorMetric). This I think will force us to have two metricGetters in ExecutorMetricType object as well, since one will call the getMetricValue and the other will call getMetricSet (or getMetricValues). Just wondering if I understand your suggestion correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r233250143 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out2 = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out2.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +ptree = mutable.Map[ Int, Set[Int]]() +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + private def getC
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231921807 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -69,11 +67,10 @@ class ExecutorMetrics private[spark] extends Serializable { */ private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { var updated = false - -(0 until ExecutorMetricType.values.length).foreach { idx => - if (executorMetrics.metrics(idx) > metrics(idx)) { +ExecutorMetricType.definedMetricsAndOffset.map {m => --- End diff -- space after `{`. also a bit clearer if you use pattern matching `.map { case (_, idx) =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r232224322 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -95,10 +148,18 @@ private[spark] object ExecutorMetricType { OnHeapUnifiedMemory, OffHeapUnifiedMemory, DirectPoolMemory, -MappedPoolMemory +MappedPoolMemory, +ProcessTreeMetrics ) - // Map of executor metric type to its index in values. - val metricIdxMap = -Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: _*) + var definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int] + var numberOfMetrics = 0 --- End diff -- you only need this mutable during initialization, so to convey its eventually final value, you can move the mutability to inside a block: ```scala val (metricToOffset, numMetrics) = { var n = 0 val _metricToOffset = mutable.LinkedHashMap.empty[String, Int] ... (_metricToOffset, n) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r232221648 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,18 +19,31 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 --- End diff -- this function is unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r232220878 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out2 = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out2.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +ptree = mutable.Map[ Int, Set[Int]]() +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + private def getChi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231980219 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) --- End diff -- style nit: if the `case` block is multiline, don't put anything after the `=>`, move it to the next line (here and elsewhere) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r23535 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,18 +19,31 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 + private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) + } + private[spark] def names = Seq(getClass().getName().stripSuffix("$").split("""\.""").last) --- End diff -- this is not a good default -- at least not on this class, where the interface allows returning multiple metrics. I do think it makes sense to have a child trait here just for all the MetricTypes which only have one value, which does have this default. Also that would be the place to have an abstract `getMetricValue()` (singular) method. That would avoid so many `new Array[Long](1)` etc. etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r23970 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -95,10 +148,18 @@ private[spark] object ExecutorMetricType { OnHeapUnifiedMemory, OffHeapUnifiedMemory, DirectPoolMemory, -MappedPoolMemory +MappedPoolMemory, +ProcessTreeMetrics ) - // Map of executor metric type to its index in values. - val metricIdxMap = -Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: _*) + var definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int] --- End diff -- this can be a `val` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231982148 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out2 = Utils.executeAndGetOutput(cmd) + return Integer.parseInt(out2.split("\n")(0)) +} catch { + case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + +" result reporting of ProcessTree metrics is stopped") +isAvailable = false +return 0 +} + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +ptree = mutable.Map[ Int, Set[Int]]() +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + private def getChi
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231980858 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logWarning("Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +try { + val cmd = Array("getconf", "PAGESIZE") + val out2 = Utils.executeAndGetOutput(cmd) --- End diff -- can just be `out` instead of `out2` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r232224769 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,10 +394,13 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } -JObject(metrics: _*) +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset +} yield { --- End diff -- there's nothing wrong with this, but I think spark is biased towards using `foreach`, `map`, & `flatMap` over scala's `for / yield` unless there is a particular reason to use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231925282 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private var ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 --- End diff -- unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r232221783 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -19,18 +19,31 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import scala.collection.mutable + +import org.apache.spark.executor.ProcfsBasedSystems import org.apache.spark.memory.MemoryManager /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. */ sealed trait ExecutorMetricType { - private[spark] def getMetricValue(memoryManager: MemoryManager): Long - private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last + private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 + private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { +new Array[Long](0) --- End diff -- `Set` isn't a great name for this, as the order matters. how about `getMetricValues()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r231926797 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.{config, Logging} + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + private var latestJVMVmemTotal = 0L + private var latestJVMRSSTotal = 0L + private var latestPythonVmemTotal = 0L + private var latestPythonRSSTotal = 0L + private var latestOtherVmemTotal = 0L + private var latestOtherRSSTotal = 0L + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { --- End diff -- I don't understand what you mean. Yes, there may be a problem later reading from this file, we have to be defensive about that, but you have to do that anyway in the call. I think of this specific check as just a first cut -- is procfs available at all? And that should not be changing at runtime. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r230864147 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,9 +394,15 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset.toSeq --- End diff -- I fixed this in my last commit. Sorry again --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r230852432 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,9 +394,15 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset.toSeq +} yield { + JField(m, executorMetrics.getMetricValue(m)) +} + + // ExecutorMetricType.definedMetricsAndOffset.foreach { case (m,offset) => --- End diff -- Sorry about these comments. will remove them soon in the next batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r230858827 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,9 +394,15 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset.toSeq --- End diff -- I am sorry it seems that there was a unit test failure after my latest change. I can produce it locally. Thank you for your report. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r230787617 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,9 +394,15 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset.toSeq --- End diff -- Thanks for the comment. However the unit test passed even with this. The failure in the latest run wasn't related to this and it was just because of a termination. I can work on changing this though to make the order specific if that is desired. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user LantaoJin commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r228830146 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -394,9 +394,15 @@ private[spark] object JsonProtocol { /** Convert executor metrics to JSON. */ def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { -val metrics = ExecutorMetricType.values.map{ metricType => - JField(metricType.name, executorMetrics.getMetricValue(metricType)) - } +val metrics = for { + (m, _) <- ExecutorMetricType.definedMetricsAndOffset.toSeq --- End diff -- `definedMetricsAndOffset.toSeq` causes generated JSON with a random order. Some UTs in `JsonProtocalSuite` will fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r22741 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -95,10 +135,29 @@ private[spark] object ExecutorMetricType { OnHeapUnifiedMemory, OffHeapUnifiedMemory, DirectPoolMemory, -MappedPoolMemory +MappedPoolMemory, +ProcessTreeMetrics + ) + // List of defined metrics + val definedMetrics = IndexedSeq( +"JVMHeapMemory", +"JVMOffHeapMemory", +"OnHeapExecutionMemory", +"OffHeapExecutionMemory", +"OnHeapStorageMemory", +"OffHeapStorageMemory", +"OnHeapUnifiedMemory", +"OffHeapUnifiedMemory", +"DirectPoolMemory", +"MappedPoolMemory", +"ProcessTreeJVMVMemory", +"ProcessTreeJVMRSSMemory", +"ProcessTreePythonVMemory", +"ProcessTreePythonRSSMemory", +"ProcessTreeOtherVMemory", +"ProcessTreeOtherRSSMemory" --- End diff -- I changed this in a way similar to what you suggested to avoid having separate names and also using arrays instead of maps --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r227082371 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + --- End diff -- found the old comment from @mccheah > Catching Throwable is generally scary, can this mask out of memory and errors of that sort? Can we scope down the exception type to handle here? I think this (partially) agrees with what I said above, we dont' want to catch `Throwable` because that can mask other stuff where the jvm is hosed. But I still think `Exception` is the right thing to catch. sound ok @mccheah ? if you really do want more specific exceptions, we should look through this more carefully to come up with a more exhaustive list, eg. I certainly don't want to fail the heartbeater because we dont' get an int out of the external call for some reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r227080176 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + --- End diff -- there's a distinction between `Throwable` and `Exception` -- `Throwable` includes Errors which are fatal to the JVM, you probably can't do anything. In general its a good question whether you should catch specific exceptions or everything. Here, you're calling an external program, and I don't feel super confident that we know how it always behaves, so I think we should be a little extra cautious. An unhandled exception here would lead to not sending any heartbeats, which would be really bad. Except for JVM errors, I think we just want to turn off this particular metric and keep going. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r227058059 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + --- End diff -- At first I was getting all throwables. Then I thought it can be dangerous. There was also a review comment about that. So Not sure what is the correct way of handling this. Is it better to just take care of exceptions that we know can be thrown or catch all throwables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r227054255 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + +" As a result reporting of ProcessTree metrics is stopped", e) +isAvailable = false +return -1 +} + } + + private def computePageSize(): Long = { +if (testing) { + return 0; +} +val cmd = Array("getconf", "PAGESIZE") +val out2 = Utils.executeAndGetOutput(cmd) +return Integer.parseInt(out2.split("\n")(0)) + } + + private def computeProcessTree(): Unit = { +if (!isAvailable || testing) { + return +} +val queue = mutable.Queue.empty[Int] +queue += pid +while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPids(p) + if(!c.isEmpty) { +queue ++= c +ptree += (p -> c.toSet) + } + else { +ptree += (p -> Set[Int]()) + } +} + } + + private def getChildPids(pid: Int): ArrayBuffer[Int] = { +try { + val cmd = Array("pgrep", "-P", pid.toString) + val builder = new ProcessBuilder("pgrep", "-P", pid.toString) + val process = builder.start() + val output = new StringBuilder() + val threadName = "read stdout for " +
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22612#discussion_r227054038 --- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} +import java.util.Locale + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +private[spark] case class ProcfsBasedSystemsMetrics( +jvmVmemTotal: Long, +jvmRSSTotal: Long, +pythonVmemTotal: Long, +pythonRSSTotal: Long, +otherVmemTotal: Long, +otherRSSTotal: Long) + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { + val procfsStatFile = "stat" + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + var pageSize = computePageSize() + var isAvailable: Boolean = isProcfsAvailable + private val pid = computePid() + private val ptree = mutable.Map[ Int, Set[Int]]() + + var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) + + computeProcessTree() + + private def isProcfsAvailable: Boolean = { +if (testing) { + return true +} +try { + if (!Files.exists(Paths.get(procfsDir))) { +return false + } +} +catch { + case f: FileNotFoundException => return false +} +val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) +val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) +shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + } + + private def computePid(): Int = { +if (!isAvailable || testing) { + return -1; +} +try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + val out2 = Utils.executeAndGetOutput(cmd) + val pid = Integer.parseInt(out2.split("\n")(0)) + return pid; +} +catch { + case e: SparkException => logDebug("IO Exception when trying to compute process tree." + --- End diff -- well, executeAndGetOutput might throw a SparkException ... but are you sure nothing else will get thrown? Eg. what if you get some weird output and then the `Integer.parseInt` failse? Is there some reason you wouldn't want the same error handling for any exception here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org