gengliangwang commented on code in PR #55612:
URL: https://github.com/apache/spark/pull/55612#discussion_r3164611197


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -47,6 +47,21 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
 
   protected def groupFilterEnabled: Boolean = 
conf.runtimeRowLevelOperationGroupFilterEnabled
 
+  // converts a MERGE condition into an EXISTS subquery for runtime filtering
+  protected def toMergeGroupFilterCondition(

Review Comment:
   Both call sites of `toMergeGroupFilterCondition` are inside 
`RewriteMergeIntoTable.scala` (the MERGE `buildReplaceDataPlan` and 
`buildWriteDeltaPlan`). Lifting it into `RewriteRowLevelCommand` widens the 
trait's surface for `RewriteUpdateTable` and `RewriteDeleteFromTable`, which 
don't use it. Could it stay as a `private def` inside `RewriteMergeIntoTable` 
instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -484,3 +484,56 @@ object GroupBasedRowLevelOperation {
     }
   }
 }
+
+/**
+ * An extractor for row-level commands such as DELETE, UPDATE, MERGE that were 
rewritten using plans
+ * that operate on individual rows (row deltas).
+ *
+ * This class extracts the following entities:
+ *  - the delta-based rewrite plan;
+ *  - the condition that defines matching rows;
+ *  - the group filter condition;
+ *  - the read relation that can be either [[DataSourceV2Relation]] or 
[[DataSourceV2ScanRelation]]
+ *  depending on whether the planning has already happened;
+ */
+object DeltaBasedRowLevelOperation {
+  type ReturnType = (WriteDelta, Expression, Option[Expression], LogicalPlan)
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
+    case wd @ WriteDelta(ExtractV2Table(table), cond, query, _, _, 
groupFilterCond, _) =>
+      val readRelation = findReadRelation(table, query)
+      readRelation.map((wd, cond, groupFilterCond, _))
+
+    case _ =>
+      None
+  }
+
+  private def findReadRelation(

Review Comment:
   The body of `DeltaBasedRowLevelOperation.findReadRelation` is essentially a 
copy of `GroupBasedRowLevelOperation.findReadRelation`, differing only in the 
absence of the `allowMultipleReads` cases. Worth pulling the shared collection 
+ size-1 + error logic into a small helper (e.g., into `RewriteRowLevelCommand` 
or a private object here) so the two extractors stay aligned over time?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -93,7 +103,7 @@ class 
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
         // this rule assigns runtime filters to both scan relations (will be 
shared at runtime)
         // and must transform the runtime filter condition to use correct expr 
IDs for each relation
         // see RewriteUpdateTable for more details

Review Comment:
   Now that this branch fires for both `ReplaceData` and `WriteDelta`, the 
comment block above only describes the group-based path: the UNION rewrite with 
two identical scan relations only applies to `ReplaceData` UPDATE (via 
`RewriteUpdateTable.buildReplaceDataWithUnionPlan`); the new `WriteDelta` 
UPDATE path has a single scan and the `attrMap`/`transform` step is effectively 
a no-op. A small clarification that delta-based UPDATE goes through the same 
code with a single scan would help future readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to