Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19271#discussion_r139830482
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
    @@ -114,6 +115,16 @@ class IncrementalExecution(
               stateInfo = Some(nextStatefulOperationStateInfo),
               batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
               eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
    +
    +      case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
    +        j.copy(
    +          stateInfo = Some(nextStatefulOperationStateInfo),
    --- End diff --
    
    Whatever optimization takes place, the same optimizations will occur in 
EVERY batch. So if aggregation is pushed below join, then all the batches will 
have that. 
    
    What we have to guard against is cost-based optimization that can reorder 
things differently in different batches. That is, why I have disabled 
cost-based join optimization. And when adding such optimizations in the future, 
we have to be cautious for the streaming case and disable them.
    
    Also, this is a general concern with other stateful ops as well, not 
something that this PR would address.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to