Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is much
cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fskm...@gmail.com> wrote:

> Hi, all. It seems the format is not normal. So I add a google doc in
> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
> Source And Table Sink and propose to merge two interfaces
> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
>
> I am looking forward to any opinions and suggestions from the community.
>
> Regards,
> Shengkai
>
> [1]
>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
>
> Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道:
>
> >    Hi, all. Currently, we use two seperated interfaces
> > SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
> The
> > interface SupportsWatermarkPushDown relies on
> > SupportsComputedColumnPushDown when watermark is defined on a computed
> > column. During the implementation, we find the method in
> > SupportsWatermarkPushDown uses an out-of-date interface WatermarkProvider
> > and the duplication of SupportsComputedColumnPushDown and
> > SupportsProjectionPushDown. Therefore, we decide to propose a new
> > interface of SupportsWatermarkPushDown to solve the problems we
> mentioned.
> >
> >
> > *Problems of SupportsComputedColumnPushDown and
> SupportsWatermarkPushDown*Problems
> > of SupportsWatermarkPushDown
> >
> > SupportsWatermarkPushDown uses an inner interface named
> WatermarkProvider to
> > register WatermarkGenerator into DynamicTableSource now. However, the
> > community uses org.apache.flink.api.common.eventtime.WatermarkStrategy to
> > create watermark generators in FLIP-126. WatermarkStrategy is a factory
> > of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> > the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> > Kafka-partition-aware watermarks. As for the origin WatermarkProvider, it
> > is used to generate deprecated AssignerWithPeriodicWatermarks and
> > PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> > suitable to use the WatermarkProvider any more.
> >
> >
> > Problems of SupportsComputedColumnPushDown
> >
> > There are two problems around when using SupportsComputedColumnPushDown
> >  alone.
> >
> > First, planner will transform the computed column and query such as
> select
> > a+b to a LogicalProject. When it comes to the optimization phase, we have
> > no means to distinguish whether the Rexnode in the projection is from
> > computed columns or query. So SupportsComputedColumnPushDown in reality
> > will push not only the computed column but also the calculation in the
> > query.
> >
> > Second, when a plan matches the rule
> > PushComputedColumnIntoTableSourceScanRule, we have to build a new
> RowData to
> > place all fields we require. However, both two rules
> > PushComputedColumnIntoTableSourceScanRule and
> > PushProjectIntoTableSourceScanRule will do the same work that prune the
> > records that read from source. It seems that we have two duplicate rules
> in
> > planner. But I think we should use the rule
> > PushProjectIntoTableSourceScanRule rather than
> > PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> > push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> > PushProjectIntoTableSourceScanRule is much lighter and we can read pruned
> > data from source rather than use a map function in flink.
> >
> > Therefore, we think it's not a good idea to use two interfaces rather
> than
> > one.
> >
> >
> > *New Proposal*
> >
> > First of all, let us address some background when pushing watermarks into
> > table source scan. There are two structures that we need to consider. We
> > list two examples below for discussion.
> >
> > structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> default_database, MyTable]])structure
> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2, 5000:INTERVAL
> SECOND)])   +- LogicalTableScan(table=[[default_catalog, default_database,
> MyTable]])
> >
> > As we can see, structure 2 is much more complicated than structure 1. For
> > structure 1, we can use the row from table scan to generate watermarks
> > directly. But for structure 2, we need to calculate the rowtime
> expression
> > in LogicalProject and use the result of calculation to generate
> > watermarks. Considering that WatermarkStrategy has the ability to extract
> > timestamp from row, we have a proposal to push only WatermarkStrategy to
> > scan.
> >
> >
> > Push WatermarkStrategy to Scan
> >
> > In this interface, we will only push WatermarkStrategy to
> > DynamicTableSource.
> >
> > public interface SupportsWatermarkPushDown {        void
> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> >
> > The advantage of the new api is that it's very simple for the developers
> > of Connector. They only need to take WatermarkStrategy into consideration
> > and don't need to deal with other infos such as ComputedColumnConverter
> > in SupportsComputedColumnPushDown. But it also has one disadvantage that
> > it needs to calculate the rowtime expression again in LogicalProjection
> > because we don't build a new row in scan to store the calculated
> timestamp.
> > However, we can replace the calculation of rowtime in LogicalProjection
> > with a reference to eliminate duplicate calculation, which will use the
> > StreamRecord's getter to read the timestamp that is calculated before.
> But
> > this optimization still has one limitation that it relies on computed
> > columns are not defined on other computed columns. For nested computed
> > columns, we have no place to save the intermediate result.
> >
> > But we still have a problem that when we push an udf into the source, we
> > need a context as powerful as FunctionContext to open the udf. But the
> > current WatermarkGeneratorSupplier.Context only supports method
> > getMetricGroup and misses methods getCachedFile, getJobParameter and g
> > etExternalResourceInfos, which means we can't convert the
> > WatermarkGeneratorSupplier.Context to FunctionContext safely. Considering
> > that the udf is only used to generate watermark, we suggest to throw
> > UnsupportedException when invoking the methods exist in FunctionContext
> > but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> > admit that there are risks in doing so because we have no promise that
> the
> > udf will not invoke these methods.
> >
> >
> > *Summary*
> >
> > We have addressed the whole problem and solution in detail. As a
> > conclusion, I think the new interface avoids the problems we mentioned
> > before. It has a clear definition as its name tells and has nothing in
> > common with SupportProjectionPushDown. As for its disadvantage, I think
> > it's acceptable to calculate the rowtime column twice and we also have a
> > plan to improve its efficiency as a follow up if it brings some
> performance
> > problems.
> >
> >
> >
>

Reply via email to