HeartSaVioR commented on code in PR #38717:
URL: https://github.com/apache/spark/pull/38717#discussion_r1027251716


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
       val logicalPlanLeafToSource = newData.flatMap { case (source, 
logicalPlan) =>
         logicalPlan.collectLeaves().map { leaf => leaf -> source }
       }
-      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming

Review Comment:
   I admit I'm not 100% sure of the historical reason to pick up logical plan 
rather than optimized plan. My guess is to enable comparing the node between 
"logical node in newData" and "leaf node in logical plan", as optimizer may 
make a change against leaf node.
   
   That said, this approach is a best effort and never be a perfect one. Say, 
if optimizer deals with self-union into aggregation, the optimized plan will 
have one less leaf node, which breaks the mechanism. If optimizer swaps 
subtrees which end up with swapping leaf nodes, it could lead to incorrect 
metrics.
   
   Ideally we should either 1) move all streaming sources to DSv2 or 2) have a 
dedicated logical and physical node for streaming DSv1 sources, but both don't 
seem to be easy to achieve.
   
   Another possible idea might be assigning some UUID in the node tag for 
association and retain the tag even optimizer applies rules. If it could 
propagate the node tag to physical plan, even better. (If that is feasible, we 
could simply collect the nodes having node tag from executed plan and match 
with source.)
   
   I could probably explore the idea, but it would take time, and also I'm not 
sure whether the idea is feasible one or not. Do you think the idea makes 
sense, or it is against how Spark optimization/physical rules work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to