gengliangwang commented on code in PR #55586:
URL: https://github.com/apache/spark/pull/55586#discussion_r3222081332
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -210,6 +219,15 @@ trait InsertIntoSQLOnlyTests
/** Check that the results in `tableName` match the `expected` DataFrame. */
protected def verifyTable(tableName: String, expected: DataFrame): Unit
+ protected def checkInsertMetrics(tableName: String, numInsertedRows: Long):
Unit = {
+ val inMemoryTable =
spark.table(tableName).queryExecution.analyzed.collectFirst {
+ case ExtractV2Table(t) => t.asInstanceOf[InMemoryBaseTable]
+ }.get
+ val summary =
inMemoryTable.commits.last.writeSummary.get.asInstanceOf[InsertSummary]
Review Comment:
`.get.asInstanceOf[InsertSummary]` chains two cryptic failure modes:
- `writeSummary.get` throws `NoSuchElementException` if the last commit
didn't carry a summary (e.g. the test ordering ever changes and an Overwrite
ends up being `commits.last`).
- The cast throws `ClassCastException` if the last commit's summary isn't an
`InsertSummary` (e.g. a MERGE earlier in the same `withTable` block).
Both surface as opaque stack traces from a test helper with no hint which
table or which prior operation caused the mismatch. A clearer fail message is
cheap here:
```suggestion
val summary = inMemoryTable.commits.last.writeSummary match {
case Some(s: InsertSummary) => s
case Some(other) => fail(s"Expected InsertSummary on $tableName, got
${other.getClass.getSimpleName}")
case None => fail(s"Last commit on $tableName had no write summary")
}
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,17 +508,18 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec
with TransactionalExec {
trait RowLevelWriteExec extends V2ExistingTableWriteExec {
def rowLevelCommand: RowLevelOperation.Command
- override protected lazy val sparkMetrics: Map[String, SQLMetric] =
rowLevelCommand match {
- case UPDATE =>
- Map(
- "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
- case DELETE =>
- Map(
- "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of
deleted rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
- case _ => Map.empty
- }
+ override protected lazy val sparkMetrics: Map[String, SQLMetric] =
super.sparkMetrics ++ (
Review Comment:
Side effect of the new `sparkMetrics` override on `V2TableWriteExec` (line
589): `numOutputRows` is now in the metric map of every V2 batch write, and
this `super.sparkMetrics ++ (...)` carries it through to row-level writes too.
So every UPDATE/DELETE/MERGE delta write will start showing `numOutputRows` in
the SQL UI.
For delta writes, the value isn't user-visible row counts —
`numOutputRowsMetric.add(result.numRows)` (line 626) counts every row consumed
by the writer iterator, which includes `COPY_OPERATION` rows (see
`DataAndMetadataWritingSparkTask` / `DataWithProjectionWritingSparkTask`) and
the tombstone writes in the delta task. An UPDATE that touches 5 rows in a
100-row partition would show `numOutputRows = 100` next to the already-correct
`numUpdatedRows = 5`, which I think will be confusing.
Seems cleaner to scope `numOutputRows` to the pure-INSERT paths — e.g. move
the `numOutputRows -> numOutputRowsMetric` registration off
`V2TableWriteExec.sparkMetrics` and onto `AppendDataExec` /
`InsertOnlyMergeExec` directly. (Also answers your earlier question about why
this metric was never populated in the UI before: it _was_ populated, just into
a local `LongAccumulator` that didn't reach the UI.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]