Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r169556960 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -107,17 +106,24 @@ case class DataSourceV2Relation( } /** - * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical - * to the non-streaming relation. + * A specialization of [[DataSourceV2Relation]] with the streaming bit set to true. + * + * Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan, + * to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]] + * after we figure out how to apply operator push-down for streaming data sources. --- End diff -- Currently the streaming execution creates the reader once. The reader is mutable and contains 2 kinds of states: 1. operator push-down states, e.g. the filters being pushed down. 2. streaming related states, like offsets, kafka connection, etc. For continues mode, it's fine. We create the reader, set offsets, construct the plan, get the physical plan, and process. We mutate the reader states at the beginning and never mutate it again. For micro-batch mode, we have a problem. We create the reader at the beginning, set reader offset, construct the plan and get the physical plan for every batch. This means we apply operator push-down to this reader many times, and data source v2 doesn't define what the behavior should be for this case. Thus we can't apply operator push-down for streaming data sources. @marmbrus @tdas @zsxwing @jose-torres I have 2 proposals to support operator push down for streaming relation: 1. Introduce a `reset` API to `DataSourceReader` to clear out the operator push-down states. Then we can call `reset` for every micro-batch and safely apply operator pushdown. 2. Do plan analyzing/optimizing/planning only once for micro-batch mode. Theoretically it's not good, as different micro-batch may have different statistics and the optimal physical plan is different, we should rerun the planner for each batch. The benefit is, plan analyzing/optimizing/planning may be costly, doing it once can mitigate the cost. Also adaptive execution can help so it's not that bad to reuse the same physical plan.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org