[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22381 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225721809 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { +metricRegistry.counter (MetricRegistry.name (prefix, name) ) --- End diff -- nit: no space before `(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225721730 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -17,6 +17,7 @@ package org.apache.spark.status +import java.time.Duration --- End diff -- Unused? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339134 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) --- End diff -- Yeah just did the Java 8 thing, guess too much, will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339052 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) + source.JOB_DURATION.value.set(duration.toMillis) + } + + // update global app status counters + appStatusSource.foreach(_.COMPLETED_STAGES.inc(job.completedStages.size)) --- End diff -- ok will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339164 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { +metricRegistry.counter (MetricRegistry.name (prefix, name) ) + } + + def createSource(conf: SparkConf): Option[AppStatusSource] = { +Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED)) + .filter(identity) + .map(_ => new AppStatusSource()) --- End diff -- ok :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225303236 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { +metricRegistry.counter (MetricRegistry.name (prefix, name) ) + } + + def createSource(conf: SparkConf): Option[AppStatusSource] = { +Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED)) + .filter(identity) + .map(_ => new AppStatusSource()) --- End diff -- `.map { foo => blah }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225300578 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- This doesn't need `yield`, though, since you're not really using the result. It can just be `for { ... } { ... }`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225301006 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) + source.JOB_DURATION.value.set(duration.toMillis) + } + + // update global app status counters + appStatusSource.foreach(_.COMPLETED_STAGES.inc(job.completedStages.size)) --- End diff -- Better to call `foreach` only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225302668 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) --- End diff -- I'm a little confused about why all these calls are needed. Isn't duration `Duration.ofMillis(completionTime.getTime() - submissionTime.getTime())`? And since the metric is actually in milliseconds, isn't this whole block basically: ``` source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime()) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224071384 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) --- End diff -- ok makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224071170 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,11 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc(1)) --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224070656 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) + + val SKIPPED_STAGES = metricRegistry.counter(MetricRegistry.name("skippedStages")) + + val COMPLETED_STAGES = metricRegistry.counter(MetricRegistry.name("completedStages")) + + val SUCCEEDED_JOBS = metricRegistry.counter(MetricRegistry.name("succeededJobs")) + + val FAILED_JOBS = metricRegistry.counter(MetricRegistry.name("failedJobs")) + + val COMPLETED_TASKS = metricRegistry.counter(MetricRegistry.name("completedTasks")) + + val FAILED_TASKS = metricRegistry.counter(MetricRegistry.name("failedTasks")) + + val KILLED_TASKS = metricRegistry.counter(MetricRegistry.name("killedTasks")) + + val SKIPPED_TASKS = metricRegistry.counter(MetricRegistry.name("skippedTasks")) + + val BLACKLISTED_EXECUTORS = metricRegistry.counter(MetricRegistry.name("blackListedExecutors")) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user aditanase commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r223667312 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) --- End diff -- I would suggest creating groups of metrics by adding a prefix with the "context", e.g. `stages`, `jobs`, `tasks`. This is a best practice followed by other sources. See: - https://github.com/apache/spark/blob/9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala#L29 - https://github.com/apache/spark/blob/a4491626ed8169f0162a0dfb78736c9b9e7fb434/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala#L38 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user aditanase commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r223664993 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,11 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc(1)) --- End diff -- You could just say `.inc()` - 1 is the default. Same everywhere else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user aditanase commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r223667852 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) + + val SKIPPED_STAGES = metricRegistry.counter(MetricRegistry.name("skippedStages")) + + val COMPLETED_STAGES = metricRegistry.counter(MetricRegistry.name("completedStages")) + + val SUCCEEDED_JOBS = metricRegistry.counter(MetricRegistry.name("succeededJobs")) + + val FAILED_JOBS = metricRegistry.counter(MetricRegistry.name("failedJobs")) + + val COMPLETED_TASKS = metricRegistry.counter(MetricRegistry.name("completedTasks")) + + val FAILED_TASKS = metricRegistry.counter(MetricRegistry.name("failedTasks")) + + val KILLED_TASKS = metricRegistry.counter(MetricRegistry.name("killedTasks")) + + val SKIPPED_TASKS = metricRegistry.counter(MetricRegistry.name("skippedTasks")) + + val BLACKLISTED_EXECUTORS = metricRegistry.counter(MetricRegistry.name("blackListedExecutors")) --- End diff -- Given that most metrics are counters, I would add a small inline function that reduces some of the repetition here, e.g. ```scala def getCounter(prefix: String, name: String): Counter = metricRegistry.counter(MetricRegistry.name(prefix, name)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r218660634 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- Yep, sorry for the late reply. :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r217672239 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- Ok will do thanx. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r217510789 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- I think the comment may be that that you can move `AppStatusStore = {` on to to same line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216496620 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- @srowen regarding multiple options its a valid technique: https://alvinalexander.com/scala/how-to-use-multiple-options-for-loop-comprehension Anyway I can keep it simple and just use ifDefined and then get the values, it liked the former though, more idiomatic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216476831 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) +appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)} --- End diff -- Actually I will transform it to gauge makes more sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216472252 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- It should be possible just tried the elegant java way: https://stackoverflow.com/questions/4927856/how-to-calculate-time-difference-in-java :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216470277 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val JOB_DURATION = metricRegistry.histogram(MetricRegistry.name("jobDuration")) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) + + val SKIPPED_STAGES = metricRegistry.counter(MetricRegistry.name("skippedStages")) + + val COMPLETED_STAGES = metricRegistry.counter(MetricRegistry.name("completedStages")) + + val COMPLETED_JOBS = metricRegistry.counter(MetricRegistry.name("completedJobs")) --- End diff -- this is not used, need to remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216469838 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) +appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)} --- End diff -- Ideally this should be a gauge due to this: https://prometheus.io/docs/instrumenting/writing_exporters/#drop-less-useful-statistics --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216465915 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) --- End diff -- need to reverse this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216433852 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- `java.util.Date.getTime` gives you ms since the epoch directly, so I think you can subtract them and be done? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216431046 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging { setupAndStartListenerBus() postEnvironmentUpdate() +_env.metricsSystem.registerSource(appStatusSource) --- End diff -- Yeah I was wondering where that would be bet to put. So at the end makes sense on the other hand what if I want to have metrics as early as the app starts. Anyway, for now its ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430860 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} + } + else { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430819 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ --- End diff -- :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430755 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- appStatusSource needs to be passed here as it is created in the SparkContext, do you mean something else? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430264 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} --- End diff -- yeah I thought intellij complained, will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216429755 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- Yeah I wanted to do this properly (java 8 wise, didnt think much of it) so on one hand I do have the timestamp on the other hand I do have the date object. So it was straightforward to pick the date. I didnt extract a timestamp out of the date object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216392303 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- Nit: don't you technically need `foreach` here? I think it might be clearer to just wrap this in `if (job.submissionTime.isDefined && ...)` rather than the `for yield` that doesn't really loop. Below -- isn't this just trying to take the difference in time in milliseconds between two dates? if so don't you just subtract their timestamps or is it more subtle here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216391459 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} --- End diff -- Nit: you don't need a block here, just parens. Same for several calls below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377882 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ --- End diff -- nit: Source { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377621 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} + } + else { --- End diff -- nit: } else { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216378185 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- nit: `: AppStatusStore`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216377526 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging { setupAndStartListenerBus() postEnvironmentUpdate() +_env.metricsSystem.registerSource(appStatusSource) --- End diff -- Better to put this line to +574 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org