Hi Jingsong,

Regarding SupportsParallelismReport,
I think the streaming connectors can also benefit from it.
I see some requirements from user ML that they want to control
source/sink's parallelism instead
to set them to global parallelism.
Also, in our compony, we did this too.

Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:16写道:

> Hi all,
>
> ## SupportsParallelismReport
>
> Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the
> old interfaces.
>
> We are considering migrating to the new interface.
>
> However, one problem is that in the old interface implementation,
> connectors infer parallelism by itself instead of a global parallelism
> configuration. Hive & filesystem determines the parallelism size according
> to the number of files and the size of the file. In this way, large tables
> may use thousands of parallelisms, while small tables only have 10
> parallelisms, which can minimize the consumption of task scheduling.
>
> This situation is very common in batch computing. For example, in the star
> model, a large table needs to be joined with multiple small tables.
>
> So we should give this ability to new table source interfaces. The
> interface can be:
>
> /**
>  * Enables to give source the ability to report parallelism.
>  *
>  * <p>After filtering push down and partition push down, the source
> can have more information,
>  * which can help it infer more effective parallelism.
>  */
> @Internal
> public interface SupportsParallelismReport {
>
>    /**
>     * Report parallelism from source or sink. The parallelism of an
> operator must be at least 1,
>     * or -1 (use system default).
>     */
>    int reportParallelism();
> }
>
>
> Rejected Alternatives:
> - SupportsSplitReport: What is the relationship between this split and the
> split of FLIP-27? Do we have to match them one by one? I think they are two
> independent things. In fact, the design of FLIP-27, split and parallelism
> are not bound one by one.
> - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> partition is a special concept of table. It should not be mixed with
> parallelism.
>
> ## SupportsStatisticsReport
>
> As with parallelism, statistics information from source will be more
> appropriate and accurate. After filtering push down and partition push
> down, the source can have more information, which can help it infer more
> effective statistics. However, if we only infer from the planner itself, it
> may lead to a big gap between the statistics information and the real
> situation.
>
> The interface:
>
> /**
>  * Enables to give {@link ScanTableSource} the ability to report table
> statistics.
>  *
>  * <p>Statistics can be inferred from real data in real time,  it is
> more accurate than the
>  * statistics in the catalog.
>  *
>  * <p>After filtering push down and partition push down, the source
> can have more information,
>  * which can help it infer more effective table statistics.
>  */
> @Internal
> public interface SupportsStatisticsReport {
>
>    /**
>     * Reports {@link TableStats} from old table stats.
>     */
>    TableStats reportTableStatistics(TableStats oldStats);
> }
>
>
> When to invoke reported statistics to the planner?
> - First of all, this call can be expensive (to view the metadata of the
> files), so it can't be called repeatedly.
> - We need to call after FilterPushdown, because that's the most accurate
> information. We also need to call before CBO (Like JoinReorder and choose
> BroadcastJoin or ShuffleJoin), because that's where statistics are used.
>
> Rejected Alternatives:
> - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
> lean to TableStats, because TableStats is the class used by planner,
> but CatalogTableStatistics may contains some catalog information which is
> not related to planner optimizer.
>
> ## Internal or Public
>
> I personally lean to internal, these interfaces are only used Hive and
> Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
> seen this requirement from outside.) and SupportsStatisticsReport(Public,
> maybe Apache Iceberg Flink connector can use it).
>
> What do you think?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>
> Best,
> Jingsong Lee
>


-- 

Best,
Benchao Li

Reply via email to