gengliangwang commented on code in PR #55612:
URL: https://github.com/apache/spark/pull/55612#discussion_r3164611197
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -47,6 +47,21 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
protected def groupFilterEnabled: Boolean =
conf.runtimeRowLevelOperationGroupFilterEnabled
+ // converts a MERGE condition into an EXISTS subquery for runtime filtering
+ protected def toMergeGroupFilterCondition(
Review Comment:
Both call sites of `toMergeGroupFilterCondition` are inside
`RewriteMergeIntoTable.scala` (the MERGE `buildReplaceDataPlan` and
`buildWriteDeltaPlan`). Lifting it into `RewriteRowLevelCommand` widens the
trait's surface for `RewriteUpdateTable` and `RewriteDeleteFromTable`, which
don't use it. Could it stay as a `private def` inside `RewriteMergeIntoTable`
instead?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala:
##########
@@ -484,3 +484,56 @@ object GroupBasedRowLevelOperation {
}
}
}
+
+/**
+ * An extractor for row-level commands such as DELETE, UPDATE, MERGE that were
rewritten using plans
+ * that operate on individual rows (row deltas).
+ *
+ * This class extracts the following entities:
+ * - the delta-based rewrite plan;
+ * - the condition that defines matching rows;
+ * - the group filter condition;
+ * - the read relation that can be either [[DataSourceV2Relation]] or
[[DataSourceV2ScanRelation]]
+ * depending on whether the planning has already happened;
+ */
+object DeltaBasedRowLevelOperation {
+ type ReturnType = (WriteDelta, Expression, Option[Expression], LogicalPlan)
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
+ case wd @ WriteDelta(ExtractV2Table(table), cond, query, _, _,
groupFilterCond, _) =>
+ val readRelation = findReadRelation(table, query)
+ readRelation.map((wd, cond, groupFilterCond, _))
+
+ case _ =>
+ None
+ }
+
+ private def findReadRelation(
Review Comment:
The body of `DeltaBasedRowLevelOperation.findReadRelation` is essentially a
copy of `GroupBasedRowLevelOperation.findReadRelation`, differing only in the
absence of the `allowMultipleReads` cases. Worth pulling the shared collection
+ size-1 + error logic into a small helper (e.g., into `RewriteRowLevelCommand`
or a private object here) so the two extractors stay aligned over time?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -93,7 +103,7 @@ class
RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
// this rule assigns runtime filters to both scan relations (will be
shared at runtime)
// and must transform the runtime filter condition to use correct expr
IDs for each relation
// see RewriteUpdateTable for more details
Review Comment:
Now that this branch fires for both `ReplaceData` and `WriteDelta`, the
comment block above only describes the group-based path: the UNION rewrite with
two identical scan relations only applies to `ReplaceData` UPDATE (via
`RewriteUpdateTable.buildReplaceDataWithUnionPlan`); the new `WriteDelta`
UPDATE path has a single scan and the `attrMap`/`transform` step is effectively
a no-op. A small clarification that delta-based UPDATE goes through the same
code with a single scan would help future readers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]