Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20647#discussion_r169556960
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -107,17 +106,24 @@ case class DataSourceV2Relation(
     }
     
     /**
    - * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
    - * to the non-streaming relation.
    + * A specialization of [[DataSourceV2Relation]] with the streaming bit set 
to true.
    + *
    + * Note that, this plan has a mutable reader, so Spark won't apply 
operator push-down for this plan,
    + * to avoid making the plan mutable. We should consolidate this plan and 
[[DataSourceV2Relation]]
    + * after we figure out how to apply operator push-down for streaming data 
sources.
    --- End diff --
    
    Currently the streaming execution creates the reader once. The reader is 
mutable and contains 2 kinds of states:
    1. operator push-down states, e.g. the filters being pushed down.
    2. streaming related states, like offsets, kafka connection, etc.
    
    For continues mode, it's fine. We create the reader, set offsets, construct 
the plan, get the physical plan, and process. We mutate the reader states at 
the beginning and never mutate it again.
    
    For micro-batch mode, we have a problem. We create the reader at the 
beginning, set reader offset, construct the plan and get the physical plan for 
every batch. This means we apply operator push-down to this reader many times, 
and data source v2 doesn't define what the behavior should be for this case. 
Thus we can't apply operator push-down for streaming data sources.
    
    @marmbrus @tdas @zsxwing @jose-torres I have 2 proposals to support 
operator push down for streaming relation:
    1. Introduce a `reset` API to `DataSourceReader` to clear out the operator 
push-down states. Then we can call `reset` for every micro-batch and safely 
apply operator pushdown.
    2. Do plan analyzing/optimizing/planning only once for micro-batch mode. 
Theoretically it's not good, as different micro-batch may have different 
statistics and the optimal physical plan is different, we should rerun the 
planner for each batch. The benefit is, plan analyzing/optimizing/planning may 
be costly, doing it once can mitigate the cost. Also adaptive execution can 
help so it's not that bad to reuse the same physical plan.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to