HeartSaVioR commented on a change in pull request #28040: [SPARK-31278][SS] Fix StreamingQuery output rows metric URL: https://github.com/apache/spark/pull/28040#discussion_r399724222
########## File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ########## @@ -203,46 +203,53 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } def stateOperatorProgresses: Seq[StateOperatorProgress] = { - val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]() - var progress = query.recentProgress.last - - operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) } - if (progress.numInputRows == 0) { - // empty batch, merge metrics from previous batch as well - progress = query.recentProgress.takeRight(2).head - operatorProgress.zipWithIndex.foreach { case (sop, index) => - // "numRowsUpdated" should be merged, as it could be updated in both batches. - // (for now it is only updated from previous batch, but things can be changed.) - // other metrics represent current status of state so picking up the latest values. - val newOperatorProgress = sop.copy( - sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated) - operatorProgress(index) = newOperatorProgress - } - } - - operatorProgress + query.recentProgress.last.stateOperators } } + val clock = new StreamManualClock() + testStream(aggWithWatermark)( AddData(inputData, 15), - CheckAnswer(), // watermark = 5 + StartStream(Trigger.ProcessingTime("interval 1 second"), clock), + AdvanceManualClock(1000L), // triggers first batch + CheckAnswer(), // watermark = 0 AssertOnQuery { _.stateNodes.size === 1 }, AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, AddData(inputData, 10, 12, 14), + AdvanceManualClock(1000L), // watermark = 5, runs no-data microbatch Review comment: This surprises me, although it's not directly related to this PR so treat it as OFF-TOPIC. Based on the test, it sounds to me as we need to wait for next trigger interval to run no-data microbatch, and we need to run no-data microbatch even input is available. The input is handled in next trigger. My expectation was that no-data microbatch is consolidated with data microbatch if there's input available. And ideally thinking, no data microbatch should not require to wait for next trigger interval. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org