[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81886512
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -100,28 +110,138 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
-  testQuietly("source and sink statuses") {
+  testQuietly("query statuses") {
--- End diff --

What do you mean? Various tests across the testsuites? 
I put it this way so that it corresponds to the class structured. 
- The test for internal logic of StreamMetrics is in StreamMetricsSuite.
- The test for the statuses returned through StreamingQueryListener is in 
StreamingQueryListenerSuite.
- The test for the StreamingQuery.status is in StreamingQuerySuite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81886305
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---
@@ -26,9 +26,13 @@ import org.apache.spark.sql.execution.streaming.Source
  *
  * @param description Description of the source corresponding to this 
status
  * @param offsetDesc Description of the current [[Source]] offset if known
+ * @param inputRate Current ingestion rate as rows / second
  * @since 2.0.0
  */
 @Experimental
-class SourceStatus private[sql] (
+case class SourceStatus private(
 val description: String,
-val offsetDesc: Option[String])
+val offsetDesc: Option[String],
+val inputRate: Double,
+val processingRate: Double,
+val triggerStatus: Map[String, String])
--- End diff --

added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81886307
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala 
---
@@ -30,8 +30,15 @@ import org.apache.spark.annotation.Experimental
  * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
  */
 @Experimental
-class StreamingQueryInfo private[sql](
+case class StreamingQueryInfo private(
   val name: String,
   val id: Long,
+  val timestamp: Long,
+  val inputRate: Double,
+  val processingRate: Double,
+  val outputRate: Double,
+  val latencyMs: Option[Double],
--- End diff --

added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81884307
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala 
---
@@ -30,8 +30,15 @@ import org.apache.spark.annotation.Experimental
  * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
  */
 @Experimental
-class StreamingQueryInfo private[sql](
+case class StreamingQueryInfo private(
--- End diff --

I put it back because the toString of case class is incredibly helpful. How 
about I make the companion object private[sql] to avoid those being exposed to 
public? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81873789
  
--- Diff: project/MimaExcludes.scala ---
@@ -53,7 +53,14 @@ object MimaExcludes {
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.getFunction"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
   
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
-  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists")
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+  
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists"),
--- End diff --

?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81873207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
--- End diff --

Scaladoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81882888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala 
---
@@ -30,8 +30,15 @@ import org.apache.spark.annotation.Experimental
  * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
  */
 @Experimental
-class StreamingQueryInfo private[sql](
+case class StreamingQueryInfo private(
   val name: String,
   val id: Long,
+  val timestamp: Long,
+  val inputRate: Double,
+  val processingRate: Double,
+  val outputRate: Double,
+  val latencyMs: Option[Double],
--- End diff --

missing from the docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81872358
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -100,28 +110,138 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
-  testQuietly("source and sink statuses") {
+  testQuietly("query statuses") {
--- End diff --

Would it make sense to combine the various metric tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81872395
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
@@ -100,28 +110,138 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
 )
   }
 
-  testQuietly("source and sink statuses") {
+  testQuietly("query statuses") {
 val inputData = MemoryStream[Int]
-val mapped = inputData.toDS().map(6 / _)
+
+// This is make the sure the execution plan ends with a node (filter) 
that supports
+// the numOutputRows metric.
+spark.conf.set("spark.sql.codegen.wholeStage", false)
--- End diff --

Does none of this work with codegen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81873307
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -511,12 +572,71 @@ class StreamExecution(
  """.stripMargin
   }
 
+  /**
+   * Report row metrics of the executed trigger
+   * @param triggerExecutionPlan Execution plan of the trigger
+   * @param triggerLogicalPlan Logical plan of the trigger, generated from 
the query logical plan
+   * @param sourceToDataframe Source to DataFrame returned by the 
source.getBatch in this trigger
+   */
+  private def reportMetrics(
+  triggerExecutionPlan: SparkPlan,
+  triggerLogicalPlan: LogicalPlan,
+  sourceToDataframe: Map[Source, DataFrame]): Unit = {
+val sourceToNumInputRows = StreamExecution.getNumInputRowsFromTrigger(
+  triggerExecutionPlan, triggerLogicalPlan, sourceToDataframe)
+val numOutputRows = 
triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
+val stateNodes = triggerExecutionPlan.collect {
+  case p if p.isInstanceOf[StateStoreSaveExec] => p
+}
+statusLock.synchronized 
{streamMetrics.reportNumRows(sourceToNumInputRows, numOutputRows)
+  stateNodes.zipWithIndex.foreach { case (s, i) =>
+streamMetrics.reportTriggerStatus(
+  NUM_TOTAL_STATE_ROWS(i + 1),
+  s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
+streamMetrics.reportTriggerStatus(
+  NUM_UPDATED_STATE_ROWS(i + 1),
+  s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+  }
+}
+  }
+
+  private def reportTimeTaken[T](triggerStatusKey: String)(body: => T): T 
= {
+val startTime = triggerClock.getTimeMillis()
+val result = body
+val endTime = triggerClock.getTimeMillis()
+statusLock.synchronized {
+  streamMetrics.reportTriggerStatus(triggerStatusKey, math.max(endTime 
- startTime, 0))
+}
+result
+  }
+
+  private def reportTimeTaken[T](source: Source, triggerStatusKey: 
String)(body: => T): T = {
+val startTime = triggerClock.getTimeMillis()
+val result = body
+val endTime = triggerClock.getTimeMillis()
+statusLock.synchronized {
+  streamMetrics.reportSourceTriggerStatus(
+source, triggerStatusKey, math.max(endTime - startTime, 0))
+}
+result
+  }
+
+  private def reportTimestamp(string: String): Unit = 
statusLock.synchronized {
+streamMetrics.reportTriggerStatus(string, triggerClock.getTimeMillis)
+  }
+
   private def toInfo: StreamingQueryInfo = {
-new StreamingQueryInfo(
+StreamingQueryInfo(
   this.name,
   this.id,
+  triggerClock.getTimeMillis(),
+  streamMetrics.currentInputRate,
+  streamMetrics.currentProcessingRate,
+  streamMetrics.currentOutputRate,
+  streamMetrics.currentLatency,
--- End diff --

I would use named arguments when there are this any of the same type in a 
row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81874159
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +145,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def status: StreamingQueryInfo = statusLock.synchronized {
+this.toInfo
--- End diff --

Inline `toInfo` to avoid unneeded indirection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81872770
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala 
---
@@ -30,8 +30,15 @@ import org.apache.spark.annotation.Experimental
  * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
  */
 @Experimental
-class StreamingQueryInfo private[sql](
+case class StreamingQueryInfo private(
--- End diff --

I think we were leaving `case` out to avoid binary incompatibility of the 
`unapply` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81875491
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -525,8 +645,62 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
-object StreamExecution {
+object StreamExecution extends Logging {
   private val _nextId = new AtomicLong(0)
 
+  /**
+   * Get the number of input rows from the executed plan of the trigger
+   * @param triggerExecutionPlan Execution plan of the trigger
+   * @param triggerLogicalPlan Logical plan of the trigger, generated from 
the query logical plan
+   * @param sourceToDataframe Source to DataFrame returned by the 
source.getBatch in this trigger
+   */
+  def getNumInputRowsFromTrigger(
+  triggerExecutionPlan: SparkPlan,
+  triggerLogicalPlan: LogicalPlan,
+  sourceToDataframe: Map[Source, DataFrame]): Map[Source, Long] = {
+
+// We want to associate execution plan leaves to sources that generate 
them, so that we match
+// the their metrics (e.g. numOutputRows) to the sources. To do this 
we do the following.
+// Consider the translation from the streaming logical plan to the 
final executed plan.
+//
+//  streaming logical plan (with sources) <==> trigger's logical plan 
<==> executed plan
+//
+// 1. We keep track of streaming sources associated with each leaf in 
the trigger's logical plan
+//- Each logical plan leaf will be associated with a single 
streaming source.
+//- There can be multiple logical plan leaves associated a 
streaming source.
+//- There can be leaves not associated with any streaming source, 
because they were
+//  generated from a batch source (e.g. stream-batch joins)
+//
+// 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
+//the trigger logical plan, we associate executed plan leaves with 
corresponding
+//streaming sources.
+//
+// 3. For each source, we sum the metrics of the associated execution 
plan leaves.
+//
+val logicalPlanLeafToSource = sourceToDataframe.flatMap { case 
(source, df) =>
+  df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+}
+val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // 
includes non-streaming sources
+val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
+if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+  val execLeafToSource = 
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => 
ep -> source }
+  }
+  val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, 
source) =>
+val numRows = 
execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+source -> numRows
+  }
+  sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum 
up rows for each source
+} else {
+  def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), 
${seq.mkString(", ")}"
+  logWarning(
+"Could not report metrics as number leaves in trigger logical plan 
did not match that" +
--- End diff --

Seems this is going to flood the logs if its ever triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81873537
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -525,8 +645,62 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
-object StreamExecution {
+object StreamExecution extends Logging {
   private val _nextId = new AtomicLong(0)
 
+  /**
+   * Get the number of input rows from the executed plan of the trigger
+   * @param triggerExecutionPlan Execution plan of the trigger
+   * @param triggerLogicalPlan Logical plan of the trigger, generated from 
the query logical plan
+   * @param sourceToDataframe Source to DataFrame returned by the 
source.getBatch in this trigger
+   */
+  def getNumInputRowsFromTrigger(
--- End diff --

Putting static methods in the object is not really a pattern we follow in 
SQL (unless you need to use them elsewhere).  Prefer `private` in the class to 
avoid spreading the code out and for uniformity.  If its only for testing, can 
we not check the output from the listener?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81872893
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---
@@ -26,9 +26,13 @@ import org.apache.spark.sql.execution.streaming.Source
  *
  * @param description Description of the source corresponding to this 
status
  * @param offsetDesc Description of the current [[Source]] offset if known
+ * @param inputRate Current ingestion rate as rows / second
  * @since 2.0.0
  */
 @Experimental
-class SourceStatus private[sql] (
+case class SourceStatus private(
 val description: String,
-val offsetDesc: Option[String])
+val offsetDesc: Option[String],
+val inputRate: Double,
+val processingRate: Double,
+val triggerStatus: Map[String, String])
--- End diff --

Some of these are missing from the documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81882933
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerStatus = new mutable.HashMap[String, String]
+  private val sourceTriggerStatus = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerStatus.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerStatus.clear()
+sourceTriggerStatus.values.foreach(_.clear())
+
+reportTriggerStatus(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerStatus(s, TRIGGER_ID, 
triggerId))
+reportTriggerStatus(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerStatus(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTriggerStatus[T](key: String, value: T): Unit = synchronized {
+triggerStatus.put(key, value.toString)
+  }
+
+  def reportSourceTriggerStatus[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerStatus(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInputRows ++= inputRows
+numOutputRows = outputRows
+  }
+
+  def reportTriggerFinished(): Unit = synchronized {
+require(currentTriggerStartTimestamp >= 0)
+val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
+reportTri

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81874295
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -105,11 +111,14 @@ class StreamExecution(
   var lastExecution: QueryExecution = null
 
   @volatile
-  var streamDeathCause: StreamingQueryException = null
+  private var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
+  private val streamMetrics = new StreamMetrics(uniqueSources.toSet, 
triggerClock,
--- End diff --

nit: break at the highest semantic level to avoid breaking in the middle of 
things that are the same.  i.e.

```scala
private val streamMetrics =
  new StreamMetrics(uniqueSources.toSet, triggerClock, 
s"StructuredStreaming.$name")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81873108
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerStatus = new mutable.HashMap[String, String]
+  private val sourceTriggerStatus = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerStatus.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerStatus.clear()
+sourceTriggerStatus.values.foreach(_.clear())
+
+reportTriggerStatus(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerStatus(s, TRIGGER_ID, 
triggerId))
+reportTriggerStatus(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerStatus(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTriggerStatus[T](key: String, value: T): Unit = synchronized {
+triggerStatus.put(key, value.toString)
+  }
+
+  def reportSourceTriggerStatus[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerStatus(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInputRows ++= inputRows
+numOutputRows = outputRows
+  }
+
+  def reportTriggerFinished(): Unit = synchronized {
+require(currentTriggerStartTimestamp >= 0)
+val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
+reportTri

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81880584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -317,15 +373,18 @@ class StreamExecution(
 // TODO: Move this to IncrementalExecution.
 
 // Request unprocessed data from all sources.
-val newData = availableOffsets.flatMap {
-  case (source, available)
+val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+  availableOffsets.flatMap {
+case (source, available)
   if committedOffsets.get(source).map(_ != 
available).getOrElse(true) =>
-val current = committedOffsets.get(source)
-val batch = source.getBatch(current, available)
-logDebug(s"Retrieving data from $source: $current -> $available")
-Some(source -> batch)
-  case _ => None
-}.toMap
+  val current = committedOffsets.get(source)
+  val batch = source.getBatch(current, available)
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81879990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -377,8 +437,9 @@ class StreamExecution(
 val batchTime = (System.nanoTime() - startTime).toDouble / 100
 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
 // Update committed offsets.
-committedOffsets ++= availableOffsets
-postEvent(new QueryProgress(this.toInfo))
+statusLock.synchronized {
--- End diff --

Why? committed offsets is being updated, and can get updated in the middle 
of the query status generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81878979
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -273,8 +317,16 @@ class StreamExecution(
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
-availableOffsets ++= newData
+reportTimeTaken(GET_OFFSET_LATENCY) {
+  val latestOffsets: Map[Source, Option[Offset]] = 
uniqueSources.map { s =>
+reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
--- End diff --

Offline discussion: This is triggerStatus stuff and does not affect 
correctness of rate calc. So this fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81876373
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -377,8 +437,9 @@ class StreamExecution(
 val batchTime = (System.nanoTime() - startTime).toDouble / 100
 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
 // Update committed offsets.
-committedOffsets ++= availableOffsets
-postEvent(new QueryProgress(this.toInfo))
+statusLock.synchronized {
--- End diff --

nit: The lock here is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81875571
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -317,15 +373,18 @@ class StreamExecution(
 // TODO: Move this to IncrementalExecution.
 
 // Request unprocessed data from all sources.
-val newData = availableOffsets.flatMap {
-  case (source, available)
+val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+  availableOffsets.flatMap {
+case (source, available)
   if committedOffsets.get(source).map(_ != 
available).getOrElse(true) =>
-val current = committedOffsets.get(source)
-val batch = source.getBatch(current, available)
-logDebug(s"Retrieving data from $source: $current -> $available")
-Some(source -> batch)
-  case _ => None
-}.toMap
+  val current = committedOffsets.get(source)
+  val batch = source.getBatch(current, available)
--- End diff --

I think it's better to also add `SOURCE_GET_BATCH_LATENCY`. Creating DF may 
be pretty slow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81875296
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -273,8 +317,16 @@ class StreamExecution(
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
-availableOffsets ++= newData
+reportTimeTaken(GET_OFFSET_LATENCY) {
+  val latestOffsets: Map[Source, Option[Offset]] = 
uniqueSources.map { s =>
+reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
--- End diff --

This is still reported separately since `body` in 
`reportTimeTaken(GET_OFFSET_LATENCY)` is not in `statusLock.synchronized`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81823396
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

Thats not sufficient. We also have to lock changes to the 
`committedOffsets` and `availableOffsets` which generating status.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81819648
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInput

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81684775
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

Actually, you can probably drop most of the synchronization if you keep two 
`StreamMetrics` objects and preallocate the slots for counters. At least the 
way things are now, each counter in `StreamMetrics` is written once per batch. 
If you tweak `sourceStatuses()` to return the metrics from the most recent 
completed batch (i.e. the `StreamMetrics` object that's not currently being 
written to), there should be no overlap between readers and writers. Eventually 
you'll want to have more than one `StreamMetrics` object anyway, since the 
scheduler will need to pipeline multiple batches to reach latencies below the 
50-100ms level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81678871
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

yeah. you are probably right. Probably have to add synchronized to a lot of 
methods. :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81678796
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -317,15 +358,18 @@ class StreamExecution(
 // TODO: Move this to IncrementalExecution.
 
 // Request unprocessed data from all sources.
-val newData = availableOffsets.flatMap {
-  case (source, available)
+val newData = timeIt(GET_BATCH_LATENCY) {
--- End diff --

Yeah. The intention in GET_BATCH_LATENCY is to measure the time taken in 
the non-lazy part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81672432
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -317,15 +358,18 @@ class StreamExecution(
 // TODO: Move this to IncrementalExecution.
 
 // Request unprocessed data from all sources.
-val newData = availableOffsets.flatMap {
-  case (source, available)
+val newData = timeIt(GET_BATCH_LATENCY) {
--- End diff --

Note that the time interval being measured here will have different 
semantics for different sources, depending on how much computation occurs 
inside the source's `getBatch` method vs. lazily when the data is read from the 
resulting Dataframe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81672040
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -136,16 +139,30 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
+  override def queryStatus: StreamingQueryInfo = {
+this.toInfo
+  }
+
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
 val localAvailableOffsets = availableOffsets
 sources.map(s =>
-  new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
+  new SourceStatus(
--- End diff --

If this method is intended to be called from threads other than the 
scheduler thread, then the entire map really ought to be synchronized on 
`streamMetrics`'s lock. Otherwise this method could return a mixture of 
statistics from different points of time, even within a single source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread frreiss
Github user frreiss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81668841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 ---
@@ -56,7 +57,12 @@ case class StateStoreRestoreExec(
 child: SparkPlan)
   extends execution.UnaryExecNode with StatefulOperator {
 
+  override lazy val metrics = Map(
+"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
--- End diff --

The metric names should probably be in a separate, centralized list of 
constants. Users will want a single place in the API docs to find a list of all 
available metrics, and the list is likely to change quite frequently as 
Structured Streaming evolves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81642196
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInp

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81641129
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -273,8 +304,14 @@ class StreamExecution(
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
-availableOffsets ++= newData
+timeIt(GET_OFFSET_LATENCY) {
+  val latestOffsets: Map[Source, Option[Offset]] = 
uniqueSources.map { s =>
+timeIt(s, SOURCE_GET_OFFSET_LATENCY) {
+  (s, s.getOffset)
+}
+  }.toMap
+  availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
--- End diff --

Just wanted to make sure I wasn't missing something.  There's later code 
that uses flatMap instead of a separate filter, maybe better to be consistent 
one way or the other but not a big deal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81629084
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -273,8 +304,14 @@ class StreamExecution(
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
-availableOffsets ++= newData
+timeIt(GET_OFFSET_LATENCY) {
+  val latestOffsets: Map[Source, Option[Offset]] = 
uniqueSources.map { s =>
+timeIt(s, SOURCE_GET_OFFSET_LATENCY) {
+  (s, s.getOffset)
+}
+  }.toMap
+  availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
--- End diff --

I just converted `flatMap over Options` to `filter out Nones and then get`. 
It seemed marginally more easy to understand than the flatmap, but I am willing 
to change it back if this is more confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81628507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
--- End diff --

Metrics are generally agnostic to sinks, but different sinks probably have 
different ways of representing data visually. I tested with Ganglia (assuming 
that its the most common one in production), found that it clusters based on 
the last period. So multiple metrics names A.B.X, A.B.Y, A.B.Z will be put in a 
cluster A.B . 

Regarding validity, we using Codahales `Metrics.name(partialName1, 
partialName2, ...)` underneath which is supposed to check formats and all that 
stuff. So I am assuming that whatever passes through that check will be valid 
for all sinks. And hyphen seems to be fine with Ganglia sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81627457
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInputRow

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81615342
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -185,25 +203,36 @@ class StreamExecution(
   SparkSession.setActiveSession(sparkSession)
 
   triggerExecutor.execute(() => {
-if (isActive) {
-  if (currentBatchId < 0) {
-// We'll do this initialization only once
-populateStartOffsets()
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
-  }
-  if (dataAvailable) {
-runBatch()
-// We'll increase currentBatchId after we complete processing 
current batch's data
-currentBatchId += 1
+streamMetrics.reportTriggerStarted(currentBatchId)
+streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Finding new data 
from sources")
+val isTerminated = timeIt(TRIGGER_LATENCY) {
+  if (isActive) {
+if (currentBatchId < 0) {
+  // We'll do this initialization only once
+  populateStartOffsets()
+  logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+} else {
+  constructNextBatch()
+}
+if (dataAvailable) {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Processing 
new data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, true)
+  runBatch()
+  // We'll increase currentBatchId after we complete 
processing current batch's data
+  currentBatchId += 1
+} else {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "No new 
data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, false)
+  Thread.sleep(100)
--- End diff --

Unless I'm misreading, it was using pollingDelayMs before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81613977
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -185,25 +203,36 @@ class StreamExecution(
   SparkSession.setActiveSession(sparkSession)
 
   triggerExecutor.execute(() => {
-if (isActive) {
-  if (currentBatchId < 0) {
-// We'll do this initialization only once
-populateStartOffsets()
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
-  }
-  if (dataAvailable) {
-runBatch()
-// We'll increase currentBatchId after we complete processing 
current batch's data
-currentBatchId += 1
+streamMetrics.reportTriggerStarted(currentBatchId)
+streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Finding new data 
from sources")
+val isTerminated = timeIt(TRIGGER_LATENCY) {
+  if (isActive) {
+if (currentBatchId < 0) {
+  // We'll do this initialization only once
+  populateStartOffsets()
+  logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+} else {
+  constructNextBatch()
+}
+if (dataAvailable) {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Processing 
new data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, true)
+  runBatch()
+  // We'll increase currentBatchId after we complete 
processing current batch's data
+  currentBatchId += 1
+} else {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "No new 
data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, false)
+  Thread.sleep(100)
--- End diff --

Well it was hardcoded before. I can make it a SQLConf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r8163
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", currentInputRate)
+  registerGauge("processingRate-total", () => currentProcessingRate)
+  registerGauge("outputRate", () => currentOutputRate)
+  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate-${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInp

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81590359
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
--- End diff --

I'm confused by this comment.  Shouldn't metrics be agnostic as to the type 
of sink, not just ganglia?  Are hyphens valid in identifier names for all 
currently used sinks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81584577
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -273,8 +304,14 @@ class StreamExecution(
 val hasNewData = {
   awaitBatchLock.lock()
   try {
-val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> 
o))
-availableOffsets ++= newData
+timeIt(GET_OFFSET_LATENCY) {
+  val latestOffsets: Map[Source, Option[Offset]] = 
uniqueSources.map { s =>
+timeIt(s, SOURCE_GET_OFFSET_LATENCY) {
+  (s, s.getOffset)
+}
+  }.toMap
+  availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
--- End diff --

I'm confused as to why this was changed to a separate filter step.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-10-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81582674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -185,25 +203,36 @@ class StreamExecution(
   SparkSession.setActiveSession(sparkSession)
 
   triggerExecutor.execute(() => {
-if (isActive) {
-  if (currentBatchId < 0) {
-// We'll do this initialization only once
-populateStartOffsets()
-logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-  } else {
-constructNextBatch()
-  }
-  if (dataAvailable) {
-runBatch()
-// We'll increase currentBatchId after we complete processing 
current batch's data
-currentBatchId += 1
+streamMetrics.reportTriggerStarted(currentBatchId)
+streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Finding new data 
from sources")
+val isTerminated = timeIt(TRIGGER_LATENCY) {
+  if (isActive) {
+if (currentBatchId < 0) {
+  // We'll do this initialization only once
+  populateStartOffsets()
+  logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+} else {
+  constructNextBatch()
+}
+if (dataAvailable) {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "Processing 
new data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, true)
+  runBatch()
+  // We'll increase currentBatchId after we complete 
processing current batch's data
+  currentBatchId += 1
+} else {
+  streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "No new 
data")
+  streamMetrics.reportTriggerInfo(DATA_AVAILABLE, false)
+  Thread.sleep(100)
--- End diff --

Why is this sleep amount hardcoded?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81437765
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  registerGauge("inputRate.total", currentInputRate)
+  registerGauge("processingRate.total", () => currentProcessingRate)
+  registerGauge("outputRate.total", () => currentOutputRate)
+  registerGauge("latencyMs", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate.${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate.${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInputRows ++= inputRows
+numOutputRows = outputRows
--- End diff --

no .. its numOutputRows in the current trigger.


---
If your proje

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81437722
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -511,12 +555,59 @@ class StreamExecution(
  """.stripMargin
   }
 
+  private def reportMetrics(executedPlan: SparkPlan): Unit = {
+val execPlanLeaves = executedPlan.collect { case p if 
p.children.isEmpty => p }
+val sourceToNumInputRows = if (execPlanLeaves.size == sources.size) {
--- End diff --

that is true. its possible. 
good point. this needs to be more complex then. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81437680
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -105,11 +105,14 @@ class StreamExecution(
   var lastExecution: QueryExecution = null
 
   @volatile
-  var streamDeathCause: StreamingQueryException = null
+  private var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
+  private val streamMetrics = new StreamMetrics(uniqueSources.toSet, 
triggerClock,
+
"%s.StructuredStreamingMetrics.%s".format(sparkSession.sparkContext.appName, 
name))
--- End diff --

Changing the format. Tested on Ganglia and decided against putting appname 
inside since we already have the query name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81430689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
+  extends CodahaleSource with Logging {
+
+  import StreamMetrics._
+
+  // Trigger infos
+  private val triggerInfo = new mutable.HashMap[String, String]
+  private val sourceTriggerInfo = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
+
+  // Rate estimators for sources and sinks
+  private val inputRates = new mutable.HashMap[Source, RateCalculator]
+  private val processingRates = new mutable.HashMap[Source, RateCalculator]
+  private val outputRate = new RateCalculator
+
+  // Number of input rows in the current trigger
+  private val numInputRows = new mutable.HashMap[Source, Long]
+  private var numOutputRows: Option[Long] = None
+  private var currentTriggerStartTimestamp: Long = -1
+  private var previousTriggerStartTimestamp: Long = -1
+  private var latency: Option[Double] = None
+
+  override val sourceName: String = codahaleSourceName
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // === Initialization ===
+
+  registerGauge("inputRate.total", currentInputRate)
+  registerGauge("processingRate.total", () => currentProcessingRate)
+  registerGauge("outputRate.total", () => currentOutputRate)
+  registerGauge("latencyMs", () => currentLatency().getOrElse(-1.0))
+
+  sources.foreach { s =>
+inputRates.put(s, new RateCalculator)
+processingRates.put(s, new RateCalculator)
+sourceTriggerInfo.put(s, new mutable.HashMap[String, String])
+
+registerGauge(s"inputRate.${s.toString}", () => 
currentSourceInputRate(s))
+registerGauge(s"processingRate.${s.toString}", () => 
currentSourceProcessingRate(s))
+  }
+
+  // === Setter methods ===
+
+  def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+numInputRows.clear()
+numOutputRows = None
+triggerInfo.clear()
+sourceTriggerInfo.values.foreach(_.clear())
+
+reportTriggerInfo(TRIGGER_ID, triggerId)
+sources.foreach(s => reportSourceTriggerInfo(s, TRIGGER_ID, triggerId))
+reportTriggerInfo(ACTIVE, true)
+currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+reportTriggerInfo(START_TIMESTAMP, currentTriggerStartTimestamp)
+  }
+
+  def reportTimestamp(key: String): Unit = synchronized {
+triggerInfo.put(key, triggerClock.getTimeMillis().toString)
+  }
+
+  def reportLatency(key: String, latencyMs: Long): Unit = synchronized {
+triggerInfo.put(key, latencyMs.toString)
+  }
+
+  def reportLatency(source: Source, key: String, latencyMs: Long): Unit = 
synchronized {
+sourceTriggerInfo(source).put(key, latencyMs.toString)
+  }
+
+  def reportTriggerInfo[T](key: String, value: T): Unit = synchronized {
+triggerInfo.put(key, value.toString)
+  }
+
+  def reportSourceTriggerInfo[T](source: Source, key: String, value: T): 
Unit = synchronized {
+sourceTriggerInfo(source).put(key, value.toString)
+  }
+
+  def reportNumRows(inputRows: Map[Source, Long], outputRows: 
Option[Long]): Unit = synchronized {
+numInputRows ++= inputRows
+numOutputRows = outputRows
--- End diff --

`numOutputRows += outputRows`


---
If your project is set up fo

[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81431635
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -511,12 +555,59 @@ class StreamExecution(
  """.stripMargin
   }
 
+  private def reportMetrics(executedPlan: SparkPlan): Unit = {
+val execPlanLeaves = executedPlan.collect { case p if 
p.children.isEmpty => p }
+val sourceToNumInputRows = if (execPlanLeaves.size == sources.size) {
--- End diff --

Is it possible that`execPlanLeaves` contains sources of batch DF?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15307: [WIP][SPARK-17731][SQL][STREAMING] Metrics for st...

2016-09-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15307#discussion_r81426227
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -105,11 +105,14 @@ class StreamExecution(
   var lastExecution: QueryExecution = null
 
   @volatile
-  var streamDeathCause: StreamingQueryException = null
+  private var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
+  private val streamMetrics = new StreamMetrics(uniqueSources.toSet, 
triggerClock,
+
"%s.StructuredStreamingMetrics.%s".format(sparkSession.sparkContext.appName, 
name))
--- End diff --

nit: 
s"${sparkSession.sparkContext.appName}.StructuredStreamingMetrics.$name".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org