gengliangwang commented on code in PR #55586:
URL: https://github.com/apache/spark/pull/55586#discussion_r3222081332


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -210,6 +219,15 @@ trait InsertIntoSQLOnlyTests
   /** Check that the results in `tableName` match the `expected` DataFrame. */
   protected def verifyTable(tableName: String, expected: DataFrame): Unit
 
+  protected def checkInsertMetrics(tableName: String, numInsertedRows: Long): 
Unit = {
+    val inMemoryTable = 
spark.table(tableName).queryExecution.analyzed.collectFirst {
+      case ExtractV2Table(t) => t.asInstanceOf[InMemoryBaseTable]
+    }.get
+    val summary = 
inMemoryTable.commits.last.writeSummary.get.asInstanceOf[InsertSummary]

Review Comment:
   `.get.asInstanceOf[InsertSummary]` chains two cryptic failure modes:
   - `writeSummary.get` throws `NoSuchElementException` if the last commit 
didn't carry a summary (e.g. the test ordering ever changes and an Overwrite 
ends up being `commits.last`).
   - The cast throws `ClassCastException` if the last commit's summary isn't an 
`InsertSummary` (e.g. a MERGE earlier in the same `withTable` block).
   
   Both surface as opaque stack traces from a test helper with no hint which 
table or which prior operation caused the mismatch. A clearer fail message is 
cheap here:
   
   ```suggestion
       val summary = inMemoryTable.commits.last.writeSummary match {
         case Some(s: InsertSummary) => s
         case Some(other) => fail(s"Expected InsertSummary on $tableName, got 
${other.getClass.getSimpleName}")
         case None => fail(s"Last commit on $tableName had no write summary")
       }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,17 +508,18 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec 
with TransactionalExec {
 trait RowLevelWriteExec extends V2ExistingTableWriteExec {
   def rowLevelCommand: RowLevelOperation.Command
 
-  override protected lazy val sparkMetrics: Map[String, SQLMetric] = 
rowLevelCommand match {
-    case UPDATE =>
-      Map(
-        "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of 
updated rows"),
-        "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of 
copied rows"))
-    case DELETE =>
-      Map(
-        "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of 
deleted rows"),
-        "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of 
copied rows"))
-    case _ => Map.empty
-  }
+  override protected lazy val sparkMetrics: Map[String, SQLMetric] = 
super.sparkMetrics ++ (

Review Comment:
   Side effect of the new `sparkMetrics` override on `V2TableWriteExec` (line 
589): `numOutputRows` is now in the metric map of every V2 batch write, and 
this `super.sparkMetrics ++ (...)` carries it through to row-level writes too. 
So every UPDATE/DELETE/MERGE delta write will start showing `numOutputRows` in 
the SQL UI.
   
   For delta writes, the value isn't user-visible row counts — 
`numOutputRowsMetric.add(result.numRows)` (line 626) counts every row consumed 
by the writer iterator, which includes `COPY_OPERATION` rows (see 
`DataAndMetadataWritingSparkTask` / `DataWithProjectionWritingSparkTask`) and 
the tombstone writes in the delta task. An UPDATE that touches 5 rows in a 
100-row partition would show `numOutputRows = 100` next to the already-correct 
`numUpdatedRows = 5`, which I think will be confusing.
   
   Seems cleaner to scope `numOutputRows` to the pure-INSERT paths — e.g. move 
the `numOutputRows -> numOutputRowsMetric` registration off 
`V2TableWriteExec.sparkMetrics` and onto `AppendDataExec` / 
`InsertOnlyMergeExec` directly. (Also answers your earlier question about why 
this metric was never populated in the UI before: it _was_ populated, just into 
a local `LongAccumulator` that didn't reach the UI.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to