Hi Timo,

Regarding "pushing other computed columns into source, e.g. encrypted
records/columns, performing checksum checks, reading metadata etc.",
I'm not sure about this.
1. the planner don't know which computed column should be pushed into source
2. it seems that we can't improve performances if we pushdown complex logic
into source, we still need to calculate them anyway.
3. the computed column is a regular expression, if the computed column
should be pushed down, then shall we push the expressions in the following
Projection too?
    If yes, then the name of "SupportsComputedColumnPushDown" might be not
correct.
4. regarding reading metadata, according to FLIP-107, we don't use the
existing SupportsComputedColumnPushDown, but a new interface.

Therefore, I don't find a strong use case that needs this interface so far.

Best,
Jark




On Tue, 8 Sep 2020 at 17:13, Timo Walther <twal...@apache.org> wrote:

> Hi Shengkai,
>
> first of I would not consider the section Problems of
> SupportsWatermarkPushDown" as a "problem". It was planned to update the
> WatermarkProvider once the interfaces are ready. See the comment in
> WatermarkProvider:
>
> // marker interface that will be filled after FLIP-126:
> // WatermarkGenerator<RowData> getWatermarkGenerator();
>
> So far we had no sources that actually implement WatermarkStrategy.
>
> Second, for generating watermarks I don't see a problem in merging the
> two mentioned interfaces SupportsWatermarkPushDown and
> SupportsComputedColumnPushDown into one. The descibed design sounds
> reasonable to me and the impact on performance should not be too large.
>
> However, by merging these two interfaces we are also merging two
> completely separate concepts. Computed columns are not always used for
> generating a rowtime or watermark. Users can and certainly will
> implement more complex logic in there. One example could be decrypting
> encrypted records/columns, performing checksum checks, reading metadata
> etc.
>
> So in any case we should still provide two interfaces:
>
> SupportsWatermarkPushDown (functionality of computed columns + watermarks)
>
>
> SupportsComputedColumnPushDown (functionality of computed columns only)
>
> I'm fine with such a design, but it is also confusing for implementers
> that SupportsWatermarkPushDown includes the functionality of the other
> interface.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 08.09.20 04:32, Jark Wu wrote:
> > Thanks to Shengkai for summarizing the problems on the FLIP-95 interfaces
> > and solutions.
> >
> > I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
> much
> > cleaner and easier to develop than before.
> > So I'm +1 to the proposal.
> >
> > Best,
> > Jark
> >
> > On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fskm...@gmail.com> wrote:
> >
> >> Hi, all. It seems the format is not normal. So I add a google doc in
> >> link[1]. This discussion is about the interfaces in FLIP-95:  New Table
> >> Source And Table Sink and propose to merge two interfaces
> >> SupportsWatermarkPushDown and SupportsComputedColumnPushDown.
> >>
> >> I am looking forward to any opinions and suggestions from the community.
> >>
> >> Regards,
> >> Shengkai
> >>
> >> [1]
> >>
> >>
> https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#
> >>
> >> Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道:
> >>
> >>>     Hi, all. Currently, we use two seperated interfaces
> >>> SupportsComputedColumnPushDown and SupportsWatermarkPushDown in design.
> >> The
> >>> interface SupportsWatermarkPushDown relies on
> >>> SupportsComputedColumnPushDown when watermark is defined on a computed
> >>> column. During the implementation, we find the method in
> >>> SupportsWatermarkPushDown uses an out-of-date interface
> WatermarkProvider
> >>> and the duplication of SupportsComputedColumnPushDown and
> >>> SupportsProjectionPushDown. Therefore, we decide to propose a new
> >>> interface of SupportsWatermarkPushDown to solve the problems we
> >> mentioned.
> >>>
> >>>
> >>> *Problems of SupportsComputedColumnPushDown and
> >> SupportsWatermarkPushDown*Problems
> >>> of SupportsWatermarkPushDown
> >>>
> >>> SupportsWatermarkPushDown uses an inner interface named
> >> WatermarkProvider to
> >>> register WatermarkGenerator into DynamicTableSource now. However, the
> >>> community uses org.apache.flink.api.common.eventtime.WatermarkStrategy
> to
> >>> create watermark generators in FLIP-126. WatermarkStrategy is a factory
> >>> of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer uses
> >>> the method assignTimestampsAndWatermarks(WatermarkStrategy) to generate
> >>> Kafka-partition-aware watermarks. As for the origin WatermarkProvider,
> it
> >>> is used to generate deprecated AssignerWithPeriodicWatermarks and
> >>> PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
> >>> suitable to use the WatermarkProvider any more.
> >>>
> >>>
> >>> Problems of SupportsComputedColumnPushDown
> >>>
> >>> There are two problems around when using SupportsComputedColumnPushDown
> >>>   alone.
> >>>
> >>> First, planner will transform the computed column and query such as
> >> select
> >>> a+b to a LogicalProject. When it comes to the optimization phase, we
> have
> >>> no means to distinguish whether the Rexnode in the projection is from
> >>> computed columns or query. So SupportsComputedColumnPushDown in reality
> >>> will push not only the computed column but also the calculation in the
> >>> query.
> >>>
> >>> Second, when a plan matches the rule
> >>> PushComputedColumnIntoTableSourceScanRule, we have to build a new
> >> RowData to
> >>> place all fields we require. However, both two rules
> >>> PushComputedColumnIntoTableSourceScanRule and
> >>> PushProjectIntoTableSourceScanRule will do the same work that prune the
> >>> records that read from source. It seems that we have two duplicate
> rules
> >> in
> >>> planner. But I think we should use the rule
> >>> PushProjectIntoTableSourceScanRule rather than
> >>> PushComputedColumnIntoTableSourceScanRule if we don't support watermark
> >>> push down. Compared to PushComputedColumnIntoTableSourceScanRule,
> >>> PushProjectIntoTableSourceScanRule is much lighter and we can read
> pruned
> >>> data from source rather than use a map function in flink.
> >>>
> >>> Therefore, we think it's not a good idea to use two interfaces rather
> >> than
> >>> one.
> >>>
> >>>
> >>> *New Proposal*
> >>>
> >>> First of all, let us address some background when pushing watermarks
> into
> >>> table source scan. There are two structures that we need to consider.
> We
> >>> list two examples below for discussion.
> >>>
> >>> structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
> >> 5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
> >> default_database, MyTable]])structure
> >> 2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
> >> SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
> 5000:INTERVAL
> >> SECOND)])   +- LogicalTableScan(table=[[default_catalog,
> default_database,
> >> MyTable]])
> >>>
> >>> As we can see, structure 2 is much more complicated than structure 1.
> For
> >>> structure 1, we can use the row from table scan to generate watermarks
> >>> directly. But for structure 2, we need to calculate the rowtime
> >> expression
> >>> in LogicalProject and use the result of calculation to generate
> >>> watermarks. Considering that WatermarkStrategy has the ability to
> extract
> >>> timestamp from row, we have a proposal to push only WatermarkStrategy
> to
> >>> scan.
> >>>
> >>>
> >>> Push WatermarkStrategy to Scan
> >>>
> >>> In this interface, we will only push WatermarkStrategy to
> >>> DynamicTableSource.
> >>>
> >>> public interface SupportsWatermarkPushDown {        void
> >> applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }
> >>>
> >>> The advantage of the new api is that it's very simple for the
> developers
> >>> of Connector. They only need to take WatermarkStrategy into
> consideration
> >>> and don't need to deal with other infos such as ComputedColumnConverter
> >>> in SupportsComputedColumnPushDown. But it also has one disadvantage
> that
> >>> it needs to calculate the rowtime expression again in LogicalProjection
> >>> because we don't build a new row in scan to store the calculated
> >> timestamp.
> >>> However, we can replace the calculation of rowtime in LogicalProjection
> >>> with a reference to eliminate duplicate calculation, which will use the
> >>> StreamRecord's getter to read the timestamp that is calculated before.
> >> But
> >>> this optimization still has one limitation that it relies on computed
> >>> columns are not defined on other computed columns. For nested computed
> >>> columns, we have no place to save the intermediate result.
> >>>
> >>> But we still have a problem that when we push an udf into the source,
> we
> >>> need a context as powerful as FunctionContext to open the udf. But the
> >>> current WatermarkGeneratorSupplier.Context only supports method
> >>> getMetricGroup and misses methods getCachedFile, getJobParameter and g
> >>> etExternalResourceInfos, which means we can't convert the
> >>> WatermarkGeneratorSupplier.Context to FunctionContext safely.
> Considering
> >>> that the udf is only used to generate watermark, we suggest to throw
> >>> UnsupportedException when invoking the methods exist in FunctionContext
> >>> but don't exist in WatermarkGeneratorSupplier.Context. But we have to
> >>> admit that there are risks in doing so because we have no promise that
> >> the
> >>> udf will not invoke these methods.
> >>>
> >>>
> >>> *Summary*
> >>>
> >>> We have addressed the whole problem and solution in detail. As a
> >>> conclusion, I think the new interface avoids the problems we mentioned
> >>> before. It has a clear definition as its name tells and has nothing in
> >>> common with SupportProjectionPushDown. As for its disadvantage, I think
> >>> it's acceptable to calculate the rowtime column twice and we also have
> a
> >>> plan to improve its efficiency as a follow up if it brings some
> >> performance
> >>> problems.
> >>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to