HeartSaVioR commented on a change in pull request #33065:
URL: https://github.com/apache/spark/pull/33065#discussion_r658604237



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
##########
@@ -76,6 +76,41 @@ trait StateStoreMetricsTest extends StreamTest {
       }
       true
     }
+  }
+
+  /** AssertOnQuery to verify the given state operator's custom metric has 
expected value */
+  def assertStateOperatorCustomMetric(
+    metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery = {

Review comment:
       nit: indent 2 more spaces

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
##########
@@ -332,4 +332,53 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     }
   }
 
+  test("SPARK-35880: custom metric numDroppedDuplicateRows in state operator 
progress") {
+    val dedupeInputData = MemoryStream[(String, Int)]
+    val dedupe = dedupeInputData.toDS().dropDuplicates().dropDuplicates("_1")

Review comment:
       I guess dropDuplicates() is unnecessary.

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
##########
@@ -76,6 +76,41 @@ trait StateStoreMetricsTest extends StreamTest {
       }
       true
     }
+  }
+
+  /** AssertOnQuery to verify the given state operator's custom metric has 
expected value */
+  def assertStateOperatorCustomMetric(
+    metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery = {

Review comment:
       Why not deduplicate with above method? Like following:
   
   ```
     def assertNumStateRows(
         total: Seq[Long],
         updated: Seq[Long],
         droppedByWatermark: Seq[Long]): AssertOnQuery = {
       AssertOnQuery(s"Check total state rows = $total, updated state rows = 
$updated" +
         s", rows dropped by watermark = $droppedByWatermark") { q =>
         // This assumes that the streaming query will not make any progress 
while the eventually
         // is being executed.
         eventually(timeout(streamingTimeout)) {
           val progressesSinceLastCheck = retrieveProgressesSinceLastCheck(q)
           val numStateOperators = 
progressesSinceLastCheck.last.stateOperators.length
   
           val allNumUpdatedRowsSinceLastCheck =
             
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
   
           val allNumRowsDroppedByWatermarkSinceLastCheck =
             
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsDroppedByWatermark))
   
           lazy val debugString = "recent progresses:\n" +
             progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
   
           val numTotalRows = 
progressesSinceLastCheck.last.stateOperators.map(_.numRowsTotal)
           assert(numTotalRows === total, s"incorrect total rows, $debugString")
   
           val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, 
numStateOperators)
           assert(numUpdatedRows === updated, s"incorrect updates rows, 
$debugString")
   
           val numRowsDroppedByWatermark = 
arraySum(allNumRowsDroppedByWatermarkSinceLastCheck,
             numStateOperators)
           assert(numRowsDroppedByWatermark === droppedByWatermark,
             s"incorrect dropped rows by watermark, $debugString")
         }
         true
       }
     }
   
     /** AssertOnQuery to verify the given state operator's custom metric has 
expected value */
     def assertStateOperatorCustomMetric(
         metric: String, expected: Long, operatorIndex: Int = 0): AssertOnQuery 
= {
       AssertOnQuery(s"Check metrics $metric has value $expected") { q =>
         eventually(timeout(streamingTimeout)) {
           val progressesSinceLastCheck = retrieveProgressesSinceLastCheck(q)
           val numStateOperators = 
progressesSinceLastCheck.last.stateOperators.length
           assert(operatorIndex < numStateOperators, s"Invalid operator Index: 
$operatorIndex")
   
           val allCustomMetricValuesSinceLastCheck = progressesSinceLastCheck
             .map(_.stateOperators(operatorIndex).customMetrics.get(metric))
             .map(Long2long)
   
           lazy val debugString = "recent progresses:\n" +
             progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
   
           assert(allCustomMetricValuesSinceLastCheck.sum === expected,
             s"incorrect custom metric ($metric), $debugString")
         }
         true
       }
     }
   
     private def retrieveProgressesSinceLastCheck(
         execution: StreamExecution): Array[StreamingQueryProgress] = {
       val recentProgress = execution.recentProgress
       require(recentProgress.nonEmpty, "No progress made, cannot check state 
metrics")
       require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
         "This test assumes that all progresses are present in q.recentProgress 
but " +
           "some may have been dropped due to retention limits")
   
       if (execution.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
       lastQuery = execution
   
       val numStateOperators = recentProgress.last.stateOperators.length
       val ret = recentProgress
         .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
         .filter(_.stateOperators.length == numStateOperators)
   
       lastCheckedRecentProgressIndex = recentProgress.length - 1
   
       ret
     }
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -92,7 +111,7 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
    * the driver after this SparkPlan has been executed and metrics have been 
updated.
    */
   def getProgress(): StateOperatorProgress = {
-    val customMetrics = stateStoreCustomMetrics
+    val customMetrics = 
stateStoreCustomMetrics.++(statefulOperatorCustomMetrics)

Review comment:
       We don't seem to use non-alphabet characters operator with dot notation.
   
   Probably, `(stateStoreCustomMetrics ++ statefulOperatorCustomMetrics)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to