Hi Jingsong, Regarding SupportsParallelismReport, I think the streaming connectors can also benefit from it. I see some requirements from user ML that they want to control source/sink's parallelism instead to set them to global parallelism. Also, in our compony, we did this too.
Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:16写道: > Hi all, > > ## SupportsParallelismReport > > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the > old interfaces. > > We are considering migrating to the new interface. > > However, one problem is that in the old interface implementation, > connectors infer parallelism by itself instead of a global parallelism > configuration. Hive & filesystem determines the parallelism size according > to the number of files and the size of the file. In this way, large tables > may use thousands of parallelisms, while small tables only have 10 > parallelisms, which can minimize the consumption of task scheduling. > > This situation is very common in batch computing. For example, in the star > model, a large table needs to be joined with multiple small tables. > > So we should give this ability to new table source interfaces. The > interface can be: > > /** > * Enables to give source the ability to report parallelism. > * > * <p>After filtering push down and partition push down, the source > can have more information, > * which can help it infer more effective parallelism. > */ > @Internal > public interface SupportsParallelismReport { > > /** > * Report parallelism from source or sink. The parallelism of an > operator must be at least 1, > * or -1 (use system default). > */ > int reportParallelism(); > } > > > Rejected Alternatives: > - SupportsSplitReport: What is the relationship between this split and the > split of FLIP-27? Do we have to match them one by one? I think they are two > independent things. In fact, the design of FLIP-27, split and parallelism > are not bound one by one. > - SupportsPartitionReport: What is partition? Actually, in table/SQL, > partition is a special concept of table. It should not be mixed with > parallelism. > > ## SupportsStatisticsReport > > As with parallelism, statistics information from source will be more > appropriate and accurate. After filtering push down and partition push > down, the source can have more information, which can help it infer more > effective statistics. However, if we only infer from the planner itself, it > may lead to a big gap between the statistics information and the real > situation. > > The interface: > > /** > * Enables to give {@link ScanTableSource} the ability to report table > statistics. > * > * <p>Statistics can be inferred from real data in real time, it is > more accurate than the > * statistics in the catalog. > * > * <p>After filtering push down and partition push down, the source > can have more information, > * which can help it infer more effective table statistics. > */ > @Internal > public interface SupportsStatisticsReport { > > /** > * Reports {@link TableStats} from old table stats. > */ > TableStats reportTableStatistics(TableStats oldStats); > } > > > When to invoke reported statistics to the planner? > - First of all, this call can be expensive (to view the metadata of the > files), so it can't be called repeatedly. > - We need to call after FilterPushdown, because that's the most accurate > information. We also need to call before CBO (Like JoinReorder and choose > BroadcastJoin or ShuffleJoin), because that's where statistics are used. > > Rejected Alternatives: > - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I > lean to TableStats, because TableStats is the class used by planner, > but CatalogTableStatistics may contains some catalog information which is > not related to planner optimizer. > > ## Internal or Public > > I personally lean to internal, these interfaces are only used Hive and > Filesystem, another way is: SupportsParallelismReport(Internal, I haven't > seen this requirement from outside.) and SupportsStatisticsReport(Public, > maybe Apache Iceberg Flink connector can use it). > > What do you think? > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > Best, > Jingsong Lee > -- Best, Benchao Li