Hi Jingsong,

Thanks for bringing up this discussion. In general, I'm +1 to enrich the
source ability by
the parallelism and stats reporting, but I'm not sure whether introducing
such "SupportsXXXX"
interface is a good idea. I will share my thoughts separately.

1) Regarding the interface SupportsParallelismReport, first of all, my
feeling is that such a mechanism
is not like other abilities like SupportsProjectionPushDown. Parallelism of
source operator would be
decided anyway, the only difference here is whether it's decided purely by
framework or by table source
itself. So another angle to understand this issue is, we can always assume
a table source has the
ability to determine the parallelism. The table source can choose to set
the parallelism by itself, or delegate
it to the framework.

This might sound like personal taste, but there is another bad case if we
introduce the interface. You
may already know we currently have two major table
sources, LookupTableSource and ScanTableSource.
IIUC it won't make much sense if the user provides a LookupTableSource and
also implements
SupportsParallelismReport.

An alternative solution would be add the method you want directly
to ScanTableSource, and also have
a default implementation returning -1, which means letting framework to
decide the parallelism.

2) Regarding the interface SupportsStatisticsReport, it seems this
interface doesn't work for unbounded
streaming table sources. What kind of implementation do you expect in such
a case? And how does this
interface work with LookupTableSource?
Another question is what the oldStats parameter is used for?

3) Internal or Public. I don't think we should mark them as internal. They
are currently only used by internal
connectors doesn't mean this interface should be internal. I can imagine
there will be lots of Filesystem like
connectors outside the project which need such capability.

Best,
Kurt


On Thu, Jul 30, 2020 at 1:02 PM Benchao Li <libenc...@apache.org> wrote:

> Hi Jingsong,
>
> Regarding SupportsParallelismReport,
> I think the streaming connectors can also benefit from it.
> I see some requirements from user ML that they want to control
> source/sink's parallelism instead
> to set them to global parallelism.
> Also, in our compony, we did this too.
>
> Jingsong Li <jingsongl...@gmail.com> 于2020年7月30日周四 上午11:16写道:
>
> > Hi all,
> >
> > ## SupportsParallelismReport
> >
> > Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using
> the
> > old interfaces.
> >
> > We are considering migrating to the new interface.
> >
> > However, one problem is that in the old interface implementation,
> > connectors infer parallelism by itself instead of a global parallelism
> > configuration. Hive & filesystem determines the parallelism size
> according
> > to the number of files and the size of the file. In this way, large
> tables
> > may use thousands of parallelisms, while small tables only have 10
> > parallelisms, which can minimize the consumption of task scheduling.
> >
> > This situation is very common in batch computing. For example, in the
> star
> > model, a large table needs to be joined with multiple small tables.
> >
> > So we should give this ability to new table source interfaces. The
> > interface can be:
> >
> > /**
> >  * Enables to give source the ability to report parallelism.
> >  *
> >  * <p>After filtering push down and partition push down, the source
> > can have more information,
> >  * which can help it infer more effective parallelism.
> >  */
> > @Internal
> > public interface SupportsParallelismReport {
> >
> >    /**
> >     * Report parallelism from source or sink. The parallelism of an
> > operator must be at least 1,
> >     * or -1 (use system default).
> >     */
> >    int reportParallelism();
> > }
> >
> >
> > Rejected Alternatives:
> > - SupportsSplitReport: What is the relationship between this split and
> the
> > split of FLIP-27? Do we have to match them one by one? I think they are
> two
> > independent things. In fact, the design of FLIP-27, split and parallelism
> > are not bound one by one.
> > - SupportsPartitionReport: What is partition? Actually, in table/SQL,
> > partition is a special concept of table. It should not be mixed with
> > parallelism.
> >
> > ## SupportsStatisticsReport
> >
> > As with parallelism, statistics information from source will be more
> > appropriate and accurate. After filtering push down and partition push
> > down, the source can have more information, which can help it infer more
> > effective statistics. However, if we only infer from the planner itself,
> it
> > may lead to a big gap between the statistics information and the real
> > situation.
> >
> > The interface:
> >
> > /**
> >  * Enables to give {@link ScanTableSource} the ability to report table
> > statistics.
> >  *
> >  * <p>Statistics can be inferred from real data in real time,  it is
> > more accurate than the
> >  * statistics in the catalog.
> >  *
> >  * <p>After filtering push down and partition push down, the source
> > can have more information,
> >  * which can help it infer more effective table statistics.
> >  */
> > @Internal
> > public interface SupportsStatisticsReport {
> >
> >    /**
> >     * Reports {@link TableStats} from old table stats.
> >     */
> >    TableStats reportTableStatistics(TableStats oldStats);
> > }
> >
> >
> > When to invoke reported statistics to the planner?
> > - First of all, this call can be expensive (to view the metadata of the
> > files), so it can't be called repeatedly.
> > - We need to call after FilterPushdown, because that's the most accurate
> > information. We also need to call before CBO (Like JoinReorder and choose
> > BroadcastJoin or ShuffleJoin), because that's where statistics are used.
> >
> > Rejected Alternatives:
> > - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I
> > lean to TableStats, because TableStats is the class used by planner,
> > but CatalogTableStatistics may contains some catalog information which is
> > not related to planner optimizer.
> >
> > ## Internal or Public
> >
> > I personally lean to internal, these interfaces are only used Hive and
> > Filesystem, another way is: SupportsParallelismReport(Internal, I haven't
> > seen this requirement from outside.) and SupportsStatisticsReport(Public,
> > maybe Apache Iceberg Flink connector can use it).
> >
> > What do you think?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >
> > Best,
> > Jingsong Lee
> >
>
>
> --
>
> Best,
> Benchao Li
>

Reply via email to