Hi, all. It seems the format is not normal. So I add a google doc in link[1]. This discussion is about the interfaces in FLIP-95: New Table Source And Table Sink and propose to merge two interfaces SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
I am looking forward to any opinions and suggestions from the community. Regards, Shengkai [1] https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit# Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道: > Hi, all. Currently, we use two seperated interfaces > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design. The > interface SupportsWatermarkPushDown relies on > SupportsComputedColumnPushDown when watermark is defined on a computed > column. During the implementation, we find the method in > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider > and the duplication of SupportsComputedColumnPushDown and > SupportsProjectionPushDown. Therefore, we decide to propose a new > interface of SupportsWatermarkPushDown to solve the problems we mentioned. > > > *Problems of SupportsComputedColumnPushDown and > SupportsWatermarkPushDown*Problems > of SupportsWatermarkPushDown > > SupportsWatermarkPushDown uses an inner interface named WatermarkProvider to > register WatermarkGenerator into DynamicTableSource now. However, the > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to > create watermark generators in FLIP-126. WatermarkStrategy is a factory > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it > is used to generate deprecated AssignerWithPeriodicWatermarks and > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not > suitable to use the WatermarkProvider any more. > > > Problems of SupportsComputedColumnPushDown > > There are two problems around when using SupportsComputedColumnPushDown > alone. > > First, planner will transform the computed column and query such as select > a+b to a LogicalProject. When it comes to the optimization phase, we have > no means to distinguish whether the Rexnode in the projection is from > computed columns or query. So SupportsComputedColumnPushDown in reality > will push not only the computed column but also the calculation in the > query. > > Second, when a plan matches the rule > PushComputedColumnIntoTableSourceScanRule, we have to build a new RowData to > place all fields we require. However, both two rules > PushComputedColumnIntoTableSourceScanRule and > PushProjectIntoTableSourceScanRule will do the same work that prune the > records that read from source. It seems that we have two duplicate rules in > planner. But I think we should use the rule > PushProjectIntoTableSourceScanRule rather than > PushComputedColumnIntoTableSourceScanRule if we don't support watermark > push down. Compared to PushComputedColumnIntoTableSourceScanRule, > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned > data from source rather than use a map function in flink. > > Therefore, we think it's not a good idea to use two interfaces rather than > one. > > > *New Proposal* > > First of all, let us address some background when pushing watermarks into > table source scan. There are two structures that we need to consider. We > list two examples below for discussion. > > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, > 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog, > default_database, MyTable]])structure 2:LogicalWatermarkAssigner(rowtime=[d], > watermark=[-($3, 5000:INTERVAL SECOND)])+- LogicalProject(a=[$0], b=[$1], > c=[$2], d=[+($2, 5000:INTERVAL SECOND)]) +- > LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) > > As we can see, structure 2 is much more complicated than structure 1. For > structure 1, we can use the row from table scan to generate watermarks > directly. But for structure 2, we need to calculate the rowtime expression > in LogicalProject and use the result of calculation to generate > watermarks. Considering that WatermarkStrategy has the ability to extract > timestamp from row, we have a proposal to push only WatermarkStrategy to > scan. > > > Push WatermarkStrategy to Scan > > In this interface, we will only push WatermarkStrategy to > DynamicTableSource. > > public interface SupportsWatermarkPushDown { void > applyWatermark(WatermarkStrategy<RowData> watermarkStrategy); } > > The advantage of the new api is that it's very simple for the developers > of Connector. They only need to take WatermarkStrategy into consideration > and don't need to deal with other infos such as ComputedColumnConverter > in SupportsComputedColumnPushDown. But it also has one disadvantage that > it needs to calculate the rowtime expression again in LogicalProjection > because we don't build a new row in scan to store the calculated timestamp. > However, we can replace the calculation of rowtime in LogicalProjection > with a reference to eliminate duplicate calculation, which will use the > StreamRecord's getter to read the timestamp that is calculated before. But > this optimization still has one limitation that it relies on computed > columns are not defined on other computed columns. For nested computed > columns, we have no place to save the intermediate result. > > But we still have a problem that when we push an udf into the source, we > need a context as powerful as FunctionContext to open the udf. But the > current WatermarkGeneratorSupplier.Context only supports method > getMetricGroup and misses methods getCachedFile, getJobParameter and g > etExternalResourceInfos, which means we can't convert the > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering > that the udf is only used to generate watermark, we suggest to throw > UnsupportedException when invoking the methods exist in FunctionContext > but don't exist in WatermarkGeneratorSupplier.Context. But we have to > admit that there are risks in doing so because we have no promise that the > udf will not invoke these methods. > > > *Summary* > > We have addressed the whole problem and solution in detail. As a > conclusion, I think the new interface avoids the problems we mentioned > before. It has a clear definition as its name tells and has nothing in > common with SupportProjectionPushDown. As for its disadvantage, I think > it's acceptable to calculate the rowtime column twice and we also have a > plan to improve its efficiency as a follow up if it brings some performance > problems. > > >