cloud-fan commented on code in PR #55711:
URL: https://github.com/apache/spark/pull/55711#discussion_r3273215161
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala:
##########
@@ -340,6 +341,46 @@ abstract class UpdateTableSuiteBase extends
RowLevelOperationSuiteBase {
checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 1)
}
+ test("metric values are stable across stage retries") {
+ // Force a shuffle in the UPDATE plan via an IN-subquery (with broadcast
disabled), then
+ // have the DAGScheduler corrupt the first attempt of every upstream
shuffle map stage so
+ // the writer stage has to retry. With a plain SQLMetric the row counters
would double up
Review Comment:
Same point as on the MERGE test: per the TODO in the conversation thread,
the writer stage doesn't actually retry under the current fetch-failure
injection, so `numUpdatedRows` / `numCopiedRows` wouldn't double up with plain
`SQLMetric` either. The test passes regardless of the SLAM swap. Worth
tightening the comment to match what's actually exercised, or referencing
#55738.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala:
##########
@@ -2663,6 +2664,57 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
+ test("metric values are stable across stage retries") {
+ // The join in the MERGE plan introduces a shuffle (with broadcast
disabled). The
+ // DAGScheduler corrupts the first attempt of every upstream shuffle map
stage, forcing
+ // the MergeRowsExec stage to retry. With plain SQLMetrics the row
counters would double
Review Comment:
Per your own TODO in the conversation thread, the current fetch-failure
injection doesn't actually retry the `MergeRowsExec`/writer stage — only
upstream shuffle map stages retry. With plain `SQLMetric`, the `numTargetRows*`
counters in `MergeRowsExec` wouldn't double up under this injection, so this
test passes equally well without the SLAM swap. It exercises the SLAM-aware
read path mechanically but doesn't actually validate the MERGE bug fix. Suggest
updating this comment (and adding a TODO pointing at #55738) so a future reader
doesn't conclude this test guards `MergeRowsExec` retries.
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala:
##########
@@ -425,6 +427,46 @@ abstract class DeleteFromTableSuiteBase extends
RowLevelOperationSuiteBase {
}
}
+ test("metric values are stable across stage retries") {
+ // Force a shuffle in the DELETE plan via an IN-subquery (with broadcast
disabled), then
+ // have the DAGScheduler corrupt the first attempt of every upstream
shuffle map stage so
+ // the writer stage has to retry. With plain SQLMetrics the writer-side
numCopiedRows /
Review Comment:
Minor — the DELETE test *does* exercise the fix, but via the scan-side path
only. Per the TODO in the conversation thread, the writer stage doesn't retry
under the current injection, so the writer-side `numCopiedRows` doesn't
actually double up — only the scan-side `numOutputRows` does, and then the
driver-side derivation `numDeletedRows = numScannedRows - numCopiedRows`
propagates the doubling. Suggest narrowing the comment to call out the
scan-side path (and the driver-derivation amplification) rather than implying
writer-side counters double up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]