HeartSaVioR commented on a change in pull request #28040: [SPARK-31278][SS] Fix StreamingQuery output rows metric URL: https://github.com/apache/spark/pull/28040#discussion_r399724883
########## File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ########## @@ -203,46 +203,53 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } def stateOperatorProgresses: Seq[StateOperatorProgress] = { - val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]() - var progress = query.recentProgress.last - - operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) } - if (progress.numInputRows == 0) { - // empty batch, merge metrics from previous batch as well - progress = query.recentProgress.takeRight(2).head - operatorProgress.zipWithIndex.foreach { case (sop, index) => - // "numRowsUpdated" should be merged, as it could be updated in both batches. - // (for now it is only updated from previous batch, but things can be changed.) - // other metrics represent current status of state so picking up the latest values. - val newOperatorProgress = sop.copy( - sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated) - operatorProgress(index) = newOperatorProgress - } - } - - operatorProgress + query.recentProgress.last.stateOperators } } + val clock = new StreamManualClock() + testStream(aggWithWatermark)( AddData(inputData, 15), - CheckAnswer(), // watermark = 5 + StartStream(Trigger.ProcessingTime("interval 1 second"), clock), + AdvanceManualClock(1000L), // triggers first batch + CheckAnswer(), // watermark = 0 AssertOnQuery { _.stateNodes.size === 1 }, AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, AddData(inputData, 10, 12, 14), + AdvanceManualClock(1000L), // watermark = 5, runs no-data microbatch + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, + AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, + AdvanceManualClock(1000L), // runs with new data from above CheckAnswer(), // watermark = 5 AssertOnQuery { _.stateNodes.size === 1 }, AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, + AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, AddData(inputData, 25), - CheckAnswer((10, 3)), // watermark = 15 + AdvanceManualClock(1000L), // actually runs batch with data + CheckAnswer(), // watermark = 5, will update to 15 next batch AssertOnQuery { _.stateNodes.size === 1 }, - AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 }, + AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, + AdvanceManualClock(1000L), // runs batch with no new data and watermark progresses Review comment: This is also something hard to understand (requires two trigger intervals instead of one - ideally zero - to run no-data microbatch) but yes this is OFF-TOPIC. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org