HeartSaVioR commented on PR #46569:
URL: https://github.com/apache/spark/pull/46569#issuecomment-2109270674

   Looks like the behavior of PruneFilters is somewhat interesting, with 
combination of filter pushdown... If the filter can push down more 
aggressively, the scope of subtree PruneFilters will prune is going to be 
"smaller".
   
   E.g. 
   
   ```
         val df = input1.toDF()
           .observe("myEvent", count(lit(1)).as("rows"))
           .filter(expr("false"))
   ```
   
   to
   
   ```
   == Analyzed Logical Plan ==
   WriteToMicroBatchDataSource MemorySink, 
5afa5bf2-9757-4e90-a604-bb2c4438ebec, Update, 0
   +- Filter false
      +- CollectMetrics myEvent, [count(1) AS rows#3L], 0
         +- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource
   
   == Optimized Logical Plan ==
   WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@64b874f0]
   +- LocalRelation <empty>, [value#1]
   ```
   
   vs
   
   ```
         val df = input1.toDF()
           .withColumn("eventTime", timestamp_seconds($"value"))
           .withWatermark("eventTime", "0 second")
           .filter(expr("false"))
   ```
   
   to
   
   ```
   == Analyzed Logical Plan ==
   WriteToMicroBatchDataSource MemorySink, 
371ac05f-8c25-43b1-b82a-ed11e9a16c29, Update, 0
   +- Filter false
      +- EventTimeWatermark eventTime#3: timestamp, 0 seconds
         +- Project [value#1, timestamp_seconds(value#1) AS eventTime#3]
            +- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource
   
   == Optimized Logical Plan ==
   WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@4accccf7]
   +- EventTimeWatermark eventTime#3: timestamp, 0 seconds
      +- LocalRelation <empty>, [value#1, eventTime#3]
   ```
   
   For former, filter can't be pushed down under CollectMetrics, and 
PruneFilters takes effect and replace both CollectMetrics and Relation with an 
empty relation.
   
   For latter, EventTimeWatermark supports pushdown which predicate isn't 
comparing with event time column. This case filter is pushed down under 
EventTimeWatermark, and PruneFilters takes effect and replace Relation, but 
EventTimeWatermark is left as it is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to