Hi Jark, Hi Shengkai,

"shall we push the expressions in the following Projection too?"

This is something that we should at least consider.

I also don't find a strong use case. But what I see is that we are merging concepts that actually can be separated. And we are executing the same code twice. Regardless of what kind of code is executed (simple timestamp casting or more complex stuff).

In any case, we cannot model ingestion time with the merged interfaces. Because the computed timestamp column is evaluated twice. If a function in the computed column is not deterministic, a watermark_rowtime != projection_rowtime mismatch is very hard to debug.

Regards,
Timo



On 08.09.20 14:11, Shengkai Fang wrote:
Hi Timo and Jark.Thanks for your replies.

Maybe I don't explain clearly in doc. I think the main reason behind is we
have no means to distinguish the calc in LogicalProject. Let me give you an
example to illustrate the reason. Assume we have 2 cases:


case 1:

create table MyTable (

int a,

int b

) with (

...

)


we use sql "select a, b, a+b as c from MyTable" to get the results.


and


case 2:

create table MyTableWithComputedColumn (

a int,

b int,

c as a + b

) with (

...

)


we use sql "select a, b, c from MyTableWithComputedColumn" to get the
results.


When coming to planner, the two sqls will have the same plan, which means
we will also push calculation from query to scan if we support computed
column push down.


As a supplement of Jark's response, currently
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#assignTimestampAndWatermarks(WatermarkStrategy)
uses WatermarkStrategy to register watermark generator supplier. I think
it's ok to use WatermarkStrategy directly because FLIP-126 has been
finished.



Jark Wu <imj...@gmail.com> 于2020年9月8日周二 下午7:38写道:

Hi Timo,

Regarding "pushing other computed columns into source, e.g. encrypted
records/columns, performing checksum checks, reading metadata etc.",
I'm not sure about this.
1. the planner don't know which computed column should be pushed into
source
2. it seems that we can't improve performances if we pushdown complex logic
into source, we still need to calculate them anyway.
3. the computed column is a regular expression, if the computed column
should be pushed down, then shall we push the expressions in the following
Projection too?
     If yes, then the name of "SupportsComputedColumnPushDown" might be not
correct.
4. regarding reading metadata, according to FLIP-107, we don't use the
existing SupportsComputedColumnPushDown, but a new interface.

Therefore, I don't find a strong use case that needs this interface so far.

Best,
Jark




On Tue, 8 Sep 2020 at 17:13, Timo Walther <twal...@apache.org> wrote:

Hi Shengkai,

first of I would not consider the section Problems of
SupportsWatermarkPushDown" as a "problem". It was planned to update the
WatermarkProvider once the interfaces are ready. See the comment in
WatermarkProvider:

// marker interface that will be filled after FLIP-126:
// WatermarkGenerator<RowData> getWatermarkGenerator();

So far we had no sources that actually implement WatermarkStrategy.

Second, for generating watermarks I don't see a problem in merging the
two mentioned interfaces SupportsWatermarkPushDown and
SupportsComputedColumnPushDown into one. The descibed design sounds
reasonable to me and the impact on performance should not be too large.

However, by merging these two interfaces we are also merging two
completely separate concepts. Computed columns are not always used for
generating a rowtime or watermark. Users can and certainly will
implement more complex logic in there. One example could be decrypting
encrypted records/columns, performing checksum checks, reading metadata
etc.

So in any case we should still provide two interfaces:

SupportsWatermarkPushDown (functionality of computed columns +
watermarks)


SupportsComputedColumnPushDown (functionality of computed columns only)

I'm fine with such a design, but it is also confusing for implementers
that SupportsWatermarkPushDown includes the functionality of the other
interface.

What do you think?

Regards,
Timo


On 08.09.20 04:32, Jark Wu wrote:
Thanks to Shengkai for summarizing the problems on the FLIP-95
interfaces
and solutions.

I think the new proposal, i.e. only pushing the "WatermarkStrategy" is
much
cleaner and easier to develop than before.
So I'm +1 to the proposal.

Best,
Jark

On Sat, 5 Sep 2020 at 13:44, Shengkai Fang <fskm...@gmail.com> wrote:

Hi, all. It seems the format is not normal. So I add a google doc in
link[1]. This discussion is about the interfaces in FLIP-95:  New
Table
Source And Table Sink and propose to merge two interfaces
SupportsWatermarkPushDown and SupportsComputedColumnPushDown.

I am looking forward to any opinions and suggestions from the
community.

Regards,
Shengkai

[1]



https://docs.google.com/document/d/1sIT8lFZ_MeNIh_GLE3hi7Y4pgzN90Ahw_LoBFni-GT4/edit#

Shengkai Fang <fskm...@gmail.com> 于2020年9月4日周五 下午2:58写道:

     Hi, all. Currently, we use two seperated interfaces
SupportsComputedColumnPushDown and SupportsWatermarkPushDown in
design.
The
interface SupportsWatermarkPushDown relies on
SupportsComputedColumnPushDown when watermark is defined on a
computed
column. During the implementation, we find the method in
SupportsWatermarkPushDown uses an out-of-date interface
WatermarkProvider
and the duplication of SupportsComputedColumnPushDown and
SupportsProjectionPushDown. Therefore, we decide to propose a new
interface of SupportsWatermarkPushDown to solve the problems we
mentioned.


*Problems of SupportsComputedColumnPushDown and
SupportsWatermarkPushDown*Problems
of SupportsWatermarkPushDown

SupportsWatermarkPushDown uses an inner interface named
WatermarkProvider to
register WatermarkGenerator into DynamicTableSource now. However, the
community uses
org.apache.flink.api.common.eventtime.WatermarkStrategy
to
create watermark generators in FLIP-126. WatermarkStrategy is a
factory
of TimestampAssigner and WatermarkGeneartor and FlinkKafkaConsumer
uses
the method assignTimestampsAndWatermarks(WatermarkStrategy) to
generate
Kafka-partition-aware watermarks. As for the origin
WatermarkProvider,
it
is used to generate deprecated AssignerWithPeriodicWatermarks and
PunctuatedWatermarkAssignerProvider. Therefore, we think it's not
suitable to use the WatermarkProvider any more.


Problems of SupportsComputedColumnPushDown

There are two problems around when using
SupportsComputedColumnPushDown
   alone.

First, planner will transform the computed column and query such as
select
a+b to a LogicalProject. When it comes to the optimization phase, we
have
no means to distinguish whether the Rexnode in the projection is from
computed columns or query. So SupportsComputedColumnPushDown in
reality
will push not only the computed column but also the calculation in
the
query.

Second, when a plan matches the rule
PushComputedColumnIntoTableSourceScanRule, we have to build a new
RowData to
place all fields we require. However, both two rules
PushComputedColumnIntoTableSourceScanRule and
PushProjectIntoTableSourceScanRule will do the same work that prune
the
records that read from source. It seems that we have two duplicate
rules
in
planner. But I think we should use the rule
PushProjectIntoTableSourceScanRule rather than
PushComputedColumnIntoTableSourceScanRule if we don't support
watermark
push down. Compared to PushComputedColumnIntoTableSourceScanRule,
PushProjectIntoTableSourceScanRule is much lighter and we can read
pruned
data from source rather than use a map function in flink.

Therefore, we think it's not a good idea to use two interfaces rather
than
one.


*New Proposal*

First of all, let us address some background when pushing watermarks
into
table source scan. There are two structures that we need to consider.
We
list two examples below for discussion.

structure 1:LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2,
5000:INTERVAL SECOND)])+- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])structure
2:LogicalWatermarkAssigner(rowtime=[d], watermark=[-($3, 5000:INTERVAL
SECOND)])+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[+($2,
5000:INTERVAL
SECOND)])   +- LogicalTableScan(table=[[default_catalog,
default_database,
MyTable]])

As we can see, structure 2 is much more complicated than structure 1.
For
structure 1, we can use the row from table scan to generate
watermarks
directly. But for structure 2, we need to calculate the rowtime
expression
in LogicalProject and use the result of calculation to generate
watermarks. Considering that WatermarkStrategy has the ability to
extract
timestamp from row, we have a proposal to push only WatermarkStrategy
to
scan.


Push WatermarkStrategy to Scan

In this interface, we will only push WatermarkStrategy to
DynamicTableSource.

public interface SupportsWatermarkPushDown {        void
applyWatermark(WatermarkStrategy<RowData> watermarkStrategy);    }

The advantage of the new api is that it's very simple for the
developers
of Connector. They only need to take WatermarkStrategy into
consideration
and don't need to deal with other infos such as
ComputedColumnConverter
in SupportsComputedColumnPushDown. But it also has one disadvantage
that
it needs to calculate the rowtime expression again in
LogicalProjection
because we don't build a new row in scan to store the calculated
timestamp.
However, we can replace the calculation of rowtime in
LogicalProjection
with a reference to eliminate duplicate calculation, which will use
the
StreamRecord's getter to read the timestamp that is calculated
before.
But
this optimization still has one limitation that it relies on computed
columns are not defined on other computed columns. For nested
computed
columns, we have no place to save the intermediate result.

But we still have a problem that when we push an udf into the source,
we
need a context as powerful as FunctionContext to open the udf. But
the
current WatermarkGeneratorSupplier.Context only supports method
getMetricGroup and misses methods getCachedFile, getJobParameter and
g
etExternalResourceInfos, which means we can't convert the
WatermarkGeneratorSupplier.Context to FunctionContext safely.
Considering
that the udf is only used to generate watermark, we suggest to throw
UnsupportedException when invoking the methods exist in
FunctionContext
but don't exist in WatermarkGeneratorSupplier.Context. But we have to
admit that there are risks in doing so because we have no promise
that
the
udf will not invoke these methods.


*Summary*

We have addressed the whole problem and solution in detail. As a
conclusion, I think the new interface avoids the problems we
mentioned
before. It has a clear definition as its name tells and has nothing
in
common with SupportProjectionPushDown. As for its disadvantage, I
think
it's acceptable to calculate the rowtime column twice and we also
have
a
plan to improve its efficiency as a follow up if it brings some
performance
problems.










Reply via email to