cloud-fan commented on code in PR #56057:
URL: https://github.com/apache/spark/pull/56057#discussion_r3287599169


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode 
{
   final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
   override protected def withNewChildInternal(newChild: LogicalPlan): Distinct 
=
     copy(child = newChild)
+  override def isStateful: Boolean = child.isStreaming

Review Comment:
   This override is non-obvious at the `Distinct` layer — `Distinct` doesn't 
directly become a `StateStoreWriter`. The existing comment in 
`UnsupportedOperationChecker.isStatefulOperation` explains it: *"Since the 
Distinct node will be replaced to Aggregate in the optimizer rule 
`ReplaceDistinctWithAggregate`, here we also need to check all Distinct node by 
assuming it as Aggregate."* Worth preserving that rationale here, or at least a 
`// see ReplaceDistinctWithAggregate` one-liner.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -80,6 +80,14 @@ abstract class LogicalPlan
   def isStreaming: Boolean = _isStreaming
   private[this] lazy val _isStreaming = children.exists(_.isStreaming)
 
+  /** Marks if a streaming node is a stateful operator. */
+  def isStateful: Boolean = false
+
+  /** Marks if a subplan contains a stateful operator. */

Review Comment:
   Two suggestions for the Scaladoc:
   
   1. "Marks if" is awkward — these return a boolean rather than marking 
anything. "Whether …" or "Returns true if …" is more conventional. For 
`containsStatefulOperator`, please also say it includes `this` (the body reads 
`isStateful || children.exists(...)`).
   
   2. More substantively, please nail down what "stateful" means here. The new 
definition is the streaming-runtime view (any operator that becomes a 
`StateStoreWriter` at execution) and matches 
`MicroBatchExecution.containsStatefulOperator` exactly. It diverges from 
`UnsupportedOperationChecker.isStatefulOperation` on two operators: 
`Deduplicate` is stateful here regardless of whether keys carry an event-time 
column, and streaming `GlobalLimit` is included here but not there. Calling 
that out — and noting that `isStatefulOperation` is intentionally narrower 
(scoped to the chained-watermark correctness check) and isn't a drop-in 
replacement target — will keep future PRs from silently swapping callers and 
changing analyzer semantics. Worth naming which existing checks *are* intended 
replacement targets, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to