[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22381


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225721809
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import AppStatusSource.getCounter
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override implicit val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = getCounter("stages", "failedStages")
+
+  val SKIPPED_STAGES = getCounter("stages", "skippedStages")
+
+  val COMPLETED_STAGES = getCounter("stages", "completedStages")
+
+  val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
+
+  val FAILED_JOBS = getCounter("jobs", "failedJobs")
+
+  val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
+
+  val FAILED_TASKS = getCounter("tasks", "failedTasks")
+
+  val KILLED_TASKS = getCounter("tasks", "killedTasks")
+
+  val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
+
+  val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
+
+  val UNBLACKLISTED_EXECUTORS = getCounter("tasks", 
"unblackListedExecutors")
+}
+
+private[spark] object AppStatusSource {
+
+  def getCounter(prefix: String, name: String)(implicit metricRegistry: 
MetricRegistry): Counter = {
+metricRegistry.counter (MetricRegistry.name (prefix, name) )
--- End diff --

nit: no space before `(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225721730
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -17,6 +17,7 @@
 
 package org.apache.spark.status
 
+import java.time.Duration
--- End diff --

Unused?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225339134
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +391,38 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+source <- appStatusSource
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+  val localSubmissionTime =
+LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+  val localCompletionTime =
+LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+  val duration = Duration.between(localSubmissionTime, 
localCompletionTime)
--- End diff --

Yeah just did the Java 8 thing, guess too much, will fix. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225339052
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +391,38 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+source <- appStatusSource
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+  val localSubmissionTime =
+LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+  val localCompletionTime =
+LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+  val duration = Duration.between(localSubmissionTime, 
localCompletionTime)
+  source.JOB_DURATION.value.set(duration.toMillis)
+  }
+
+  // update global app status counters
+  
appStatusSource.foreach(_.COMPLETED_STAGES.inc(job.completedStages.size))
--- End diff --

ok will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225339164
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import AppStatusSource.getCounter
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override implicit val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = getCounter("stages", "failedStages")
+
+  val SKIPPED_STAGES = getCounter("stages", "skippedStages")
+
+  val COMPLETED_STAGES = getCounter("stages", "completedStages")
+
+  val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
+
+  val FAILED_JOBS = getCounter("jobs", "failedJobs")
+
+  val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
+
+  val FAILED_TASKS = getCounter("tasks", "failedTasks")
+
+  val KILLED_TASKS = getCounter("tasks", "killedTasks")
+
+  val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
+
+  val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
+
+  val UNBLACKLISTED_EXECUTORS = getCounter("tasks", 
"unblackListedExecutors")
+}
+
+private[spark] object AppStatusSource {
+
+  def getCounter(prefix: String, name: String)(implicit metricRegistry: 
MetricRegistry): Counter = {
+metricRegistry.counter (MetricRegistry.name (prefix, name) )
+  }
+
+  def createSource(conf: SparkConf): Option[AppStatusSource] = {
+Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
+  .filter(identity)
+  .map(_ => new AppStatusSource())
--- End diff --

ok :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225303236
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import AppStatusSource.getCounter
+import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override implicit val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = getCounter("stages", "failedStages")
+
+  val SKIPPED_STAGES = getCounter("stages", "skippedStages")
+
+  val COMPLETED_STAGES = getCounter("stages", "completedStages")
+
+  val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
+
+  val FAILED_JOBS = getCounter("jobs", "failedJobs")
+
+  val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
+
+  val FAILED_TASKS = getCounter("tasks", "failedTasks")
+
+  val KILLED_TASKS = getCounter("tasks", "killedTasks")
+
+  val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
+
+  val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
+
+  val UNBLACKLISTED_EXECUTORS = getCounter("tasks", 
"unblackListedExecutors")
+}
+
+private[spark] object AppStatusSource {
+
+  def getCounter(prefix: String, name: String)(implicit metricRegistry: 
MetricRegistry): Counter = {
+metricRegistry.counter (MetricRegistry.name (prefix, name) )
+  }
+
+  def createSource(conf: SparkConf): Option[AppStatusSource] = {
+Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
+  .filter(identity)
+  .map(_ => new AppStatusSource())
--- End diff --

`.map { foo => blah }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225300578
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

This doesn't need `yield`, though, since you're not really using the 
result. It can just be `for { ... } { ... }`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225301006
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +391,38 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+source <- appStatusSource
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+  val localSubmissionTime =
+LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+  val localCompletionTime =
+LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+  val duration = Duration.between(localSubmissionTime, 
localCompletionTime)
+  source.JOB_DURATION.value.set(duration.toMillis)
+  }
+
+  // update global app status counters
+  
appStatusSource.foreach(_.COMPLETED_STAGES.inc(job.completedStages.size))
--- End diff --

Better to call `foreach` only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r225302668
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +391,38 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+source <- appStatusSource
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+  val localSubmissionTime =
+LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+  val localCompletionTime =
+LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+  val duration = Duration.between(localSubmissionTime, 
localCompletionTime)
--- End diff --

I'm a little confused about why all these calls are needed. Isn't duration 
`Duration.ofMillis(completionTime.getTime() - submissionTime.getTime())`?

And since the metric is actually in milliseconds, isn't this whole block 
basically:

```
source.JOB_DURATION.value.set(completionTime.getTime() - 
submissionTime.getTime())
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r224071384
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = 
metricRegistry.counter(MetricRegistry.name("failedStages"))
--- End diff --

ok makes sense


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r224071170
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,11 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc(1))
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r224070656
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = 
metricRegistry.counter(MetricRegistry.name("failedStages"))
+
+  val SKIPPED_STAGES = 
metricRegistry.counter(MetricRegistry.name("skippedStages"))
+
+  val COMPLETED_STAGES = 
metricRegistry.counter(MetricRegistry.name("completedStages"))
+
+  val SUCCEEDED_JOBS = 
metricRegistry.counter(MetricRegistry.name("succeededJobs"))
+
+  val FAILED_JOBS = 
metricRegistry.counter(MetricRegistry.name("failedJobs"))
+
+  val COMPLETED_TASKS = 
metricRegistry.counter(MetricRegistry.name("completedTasks"))
+
+  val FAILED_TASKS = 
metricRegistry.counter(MetricRegistry.name("failedTasks"))
+
+  val KILLED_TASKS = 
metricRegistry.counter(MetricRegistry.name("killedTasks"))
+
+  val SKIPPED_TASKS = 
metricRegistry.counter(MetricRegistry.name("skippedTasks"))
+
+  val BLACKLISTED_EXECUTORS = 
metricRegistry.counter(MetricRegistry.name("blackListedExecutors"))
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-09 Thread aditanase
Github user aditanase commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r223667312
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = 
metricRegistry.counter(MetricRegistry.name("failedStages"))
--- End diff --

I would suggest creating groups of metrics by adding a prefix with the 
"context", e.g. `stages`, `jobs`, `tasks`.
This is a best practice followed by other sources. See:
- 
https://github.com/apache/spark/blob/9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala#L29
- 
https://github.com/apache/spark/blob/a4491626ed8169f0162a0dfb78736c9b9e7fb434/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala#L38


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-09 Thread aditanase
Github user aditanase commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r223664993
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,11 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc(1))
--- End diff --

You could just say `.inc()` - 1 is the default. Same everywhere else.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-10-09 Thread aditanase
Github user aditanase commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r223667852
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.concurrent.atomic.AtomicLong
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.metrics.source.Source
+
+private [spark] class JobDuration(val value: AtomicLong) extends 
Gauge[Long] {
+  override def getValue: Long = value.get()
+}
+
+private[spark] class AppStatusSource extends Source {
+
+  override val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val jobDuration = new JobDuration(new AtomicLong(0L))
+
+  // Duration of each job in milliseconds
+  val JOB_DURATION = metricRegistry
+.register(MetricRegistry.name("jobDuration"), jobDuration)
+
+  val FAILED_STAGES = 
metricRegistry.counter(MetricRegistry.name("failedStages"))
+
+  val SKIPPED_STAGES = 
metricRegistry.counter(MetricRegistry.name("skippedStages"))
+
+  val COMPLETED_STAGES = 
metricRegistry.counter(MetricRegistry.name("completedStages"))
+
+  val SUCCEEDED_JOBS = 
metricRegistry.counter(MetricRegistry.name("succeededJobs"))
+
+  val FAILED_JOBS = 
metricRegistry.counter(MetricRegistry.name("failedJobs"))
+
+  val COMPLETED_TASKS = 
metricRegistry.counter(MetricRegistry.name("completedTasks"))
+
+  val FAILED_TASKS = 
metricRegistry.counter(MetricRegistry.name("failedTasks"))
+
+  val KILLED_TASKS = 
metricRegistry.counter(MetricRegistry.name("killedTasks"))
+
+  val SKIPPED_TASKS = 
metricRegistry.counter(MetricRegistry.name("skippedTasks"))
+
+  val BLACKLISTED_EXECUTORS = 
metricRegistry.counter(MetricRegistry.name("blackListedExecutors"))
--- End diff --

Given that most metrics are counters, I would add a small inline function 
that reduces some of the repetition here, e.g.
```scala
def getCounter(prefix: String, name: String): Counter =
  metricRegistry.counter(MetricRegistry.name(prefix, name))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-18 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r218660634
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

Yep, sorry for the late reply. :(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r217672239
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

Ok will do thanx.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r217510789
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

I think the comment may be that that you can move `AppStatusStore = {` on 
to to same line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216496620
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

@srowen regarding multiple options its a valid technique: 
https://alvinalexander.com/scala/how-to-use-multiple-options-for-loop-comprehension
Anyway I can keep it simple and  just use ifDefined and then get the 
values, it liked the former though, more idiomatic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216476831
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+val localSubmissionTime =
+  LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+val localCompletionTime =
+  LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+val duration = Duration.between(localCompletionTime, 
localSubmissionTime)
+appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)}
--- End diff --

Actually I will transform it to gauge makes more sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216472252
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

It should be possible just tried the elegant java way: 
https://stackoverflow.com/questions/4927856/how-to-calculate-time-difference-in-java
 :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216470277
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class AppStatusSource extends Source{
+
+  override val metricRegistry = new MetricRegistry()
+
+  override val sourceName = "appStatus"
+
+  val JOB_DURATION = 
metricRegistry.histogram(MetricRegistry.name("jobDuration"))
+
+  val FAILED_STAGES = 
metricRegistry.counter(MetricRegistry.name("failedStages"))
+
+  val SKIPPED_STAGES = 
metricRegistry.counter(MetricRegistry.name("skippedStages"))
+
+  val COMPLETED_STAGES = 
metricRegistry.counter(MetricRegistry.name("completedStages"))
+
+  val COMPLETED_JOBS = 
metricRegistry.counter(MetricRegistry.name("completedJobs"))
--- End diff --

this is not used, need to remove.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216469838
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+val localSubmissionTime =
+  LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+val localCompletionTime =
+  LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+val duration = Duration.between(localCompletionTime, 
localSubmissionTime)
+appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)}
--- End diff --

Ideally this should be a gauge due to this: 
https://prometheus.io/docs/instrumenting/writing_exporters/#drop-less-useful-statistics



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216465915
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
+val localSubmissionTime =
+  LocalDateTime.ofInstant(submissionTime.toInstant, 
ZoneId.systemDefault)
+val localCompletionTime =
+  LocalDateTime.ofInstant(completionTime.toInstant, 
ZoneId.systemDefault)
+val duration = Duration.between(localCompletionTime, 
localSubmissionTime)
--- End diff --

need to reverse this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216433852
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

`java.util.Date.getTime` gives you ms since the epoch directly, so I think 
you can subtract them and be done?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216431046
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
 setupAndStartListenerBus()
 postEnvironmentUpdate()
+_env.metricsSystem.registerSource(appStatusSource)
--- End diff --

Yeah I was wondering where that would be bet to put. So at the end makes 
sense on the other hand what if I want to have metrics as early as the app 
starts. Anyway, for now its ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216430860
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
+  }
+  else {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216430819
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class AppStatusSource extends Source{
--- End diff --

:+1: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216430755
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

appStatusSource needs to be passed here as it is created in the 
SparkContext, do you mean something else? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216430264
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
--- End diff --

yeah I thought intellij complained, will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216429755
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

Yeah I wanted to do this properly (java 8 wise, didnt think much of it) so 
on one hand I do have the timestamp on the other hand I do have the date 
object. So it was straightforward to pick the date. I didnt extract a timestamp 
out of the date object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216392303
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -382,11 +392,37 @@ private[spark] class AppStatusListener(
   }
 
   job.status = event.jobResult match {
-case JobSucceeded => JobExecutionStatus.SUCCEEDED
-case JobFailed(_) => JobExecutionStatus.FAILED
+case JobSucceeded =>
+  appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)}
+  JobExecutionStatus.SUCCEEDED
+case JobFailed(_) =>
+  appStatusSource.foreach{_.FAILED_JOBS.inc(1)}
+  JobExecutionStatus.FAILED
   }
 
   job.completionTime = if (event.time > 0) Some(new Date(event.time)) 
else None
+
+  for {
+submissionTime <- job.submissionTime
+completionTime <- job.completionTime
+  } yield {
--- End diff --

Nit: don't you technically need `foreach` here? I think it might be clearer 
to just wrap this in `if (job.submissionTime.isDefined && ...)` rather than the 
`for yield` that doesn't really loop.
Below -- isn't this just trying to take the difference in time in 
milliseconds between two dates? if so don't you just subtract their timestamps 
or is it more subtle here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216391459
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
--- End diff --

Nit: you don't need a block here, just parens. Same for several calls below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377882
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala 
---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class AppStatusSource extends Source{
--- End diff --

nit: Source {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377621
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -280,6 +284,12 @@ private[spark] class AppStatusListener(
   private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
 liveExecutors.get(execId).foreach { exec =>
   exec.isBlacklisted = blacklisted
+  if (blacklisted) {
+appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)}
+  }
+  else {
--- End diff --

nit: } else {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216378185
  
--- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
@@ -503,9 +503,12 @@ private[spark] object AppStatusStore {
   /**
* Create an in-memory store for a live application.
*/
-  def createLiveStore(conf: SparkConf): AppStatusStore = {
+  def createLiveStore(
+  conf: SparkConf,
+  appStatusSource: Option[AppStatusSource] = None):
--- End diff --

nit: `: AppStatusStore`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...

2018-09-10 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22381#discussion_r216377526
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
 setupAndStartListenerBus()
 postEnvironmentUpdate()
+_env.metricsSystem.registerSource(appStatusSource)
--- End diff --

Better to put this line to +574


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org