Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New Table
Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the community.

Regards,
Shengkai

[1]
https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#

Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道:

>    Hi, all. Currently, we use two seperated interfaces
> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The
> interface SupportsWatermarkPushDown relies on
> SupportsComputedColumnPushDown when watermark is defined on a computed
> column. During the implementation, we find the method in
> SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
> and the duplication of SupportsComputedColumnPushDown and
> SupportsProjectionPushDown. Therefore, we decide to propose a new
> interface of SupportsWatermarkPushDown to solve the problems we mentioned.
>
>
> *Problems of SupportsComputedColumnPushDown and 
> SupportsWatermarkPushDown*Problems
> of SupportsWatermarkPushDown
>
> SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to
> register WatermarkGenerator into DynamicTableSource now. However, the
> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
> create watermark generators in FLIP-126. WatermarkStrategy is a factory
> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
> is used to generate deprecated AssignerWithPeriodicWatermarks and
> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> suitable to use the WatermarkProvider any more.
>
>
> Problems of SupportsComputedColumnPushDown
>
> There are two problems around when using SupportsComputedColumnPushDown
>  alone.
>
> First, planner will transform the computed column and query such as select
> a+b to a LogicalProject. When it comes to the optimization phase, we have
> no means to distinguish whether the Rexnode in the projection is from
> computed columns or query. So SupportsComputedColumnPushDown in reality
> will push not only the computed column but also the calculation in the
> query.
>
> Second, when a plan matches the rule
> PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to
> place all fields we require. However, both two rules
> PushComputedColumnIntoTableSourceScanRule and
> PushProjectIntoTableSourceScanRule will do the same work that prune the
> records that read from source. It seems that we have two duplicate rules in
> planner. But I think we should use the rule
> PushProjectIntoTableSourceScanRule rather than
> PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
> data from source rather than use a map function in flink.
>
> Therefore, we think it's not a good idea to use two interfaces rather than
> one.
>
>
> *New Proposal*
>
> First of all, let us address some background when pushing watermarks into
> table source scan. There are two structures that we need to consider. We
> list two examples below for discussion.
>
> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 
> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, 
> default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], 
> watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], 
> c=[$2], d=[+($2, 5000:INTERVAL SECOND)])   +- 
> LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
>
> As we can see, structure 2 is much more complicated than structure 1. For
> structure 1, we can use the row from table scan to generate watermarks
> directly. But for structure 2, we need to calculate the rowtime expression
> in LogicalProject and use the result of calculation to generate
> watermarks. Considering that WatermarkStrategy has the ability to extract
> timestamp from row, we have a proposal to push only WatermarkStrategy to
> scan.
>
>
> Push WatermarkStrategy to Scan
>
> In this interface, we will only push WatermarkStrategy to
> DynamicTableSource.
>
> public interface SupportsWatermarkPushDown {        void 
> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
>
> The advantage of the new api is that it's very simple for the developers
> of Connector. They only need to take WatermarkStrategy into consideration
> and don't need to deal with other infos such as ComputedColumnConverter
> in SupportsComputedColumnPushDown. But it also has one disadvantage that
> it needs to calculate the rowtime expression again in LogicalProjection
> because we don't build a new row in scan to store the calculated timestamp.
> However, we can replace the calculation of rowtime in LogicalProjection
> with a reference to eliminate duplicate calculation, which will use the
> StreamRecord's getter to read the timestamp that is calculated before. But
> this optimization still has one limitation that it relies on computed
> columns are not defined on other computed columns. For nested computed
> columns, we have no place to save the intermediate result.
>
> But we still have a problem that when we push an udf into the source, we
> need a context as powerful as FunctionContext to open the udf. But the
> current WatermarkGeneratorSupplier.Context only supports method
> getMetricGroup and misses methods getCachedFile, getJobParameter and g
> etExternalResourceInfos, which means we can't convert the
> WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
> that the udf is only used to generate watermark, we suggest to throw
> UnsupportedException when invoking the methods exist in FunctionContext
> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> admit that there are risks in doing so because we have no promise that the
> udf will not invoke these methods.
>
>
> *Summary*
>
> We have addressed the whole problem and solution in detail. As a
> conclusion, I think the new interface avoids the problems we mentioned
> before. It has a clear definition as its name tells and has nothing in
> common with SupportProjectionPushDown. As for its disadvantage, I think
> it's acceptable to calculate the rowtime column twice and we also have a
> plan to improve its efficiency as a follow up if it brings some performance
> problems.
>
>
>

Reply via email to