Re: [DISCUSS] Support partition pruning for streaming reading

2022-07-06 Thread cao zou
Hi, godfrey and Jark, thanks for joining the discussion.

The implications for FileSource


Sorry about that, I have missed an important feature that the FileSource

supports continuous reading. I think we can do the same thing for the
FileSource,

which is using the partition pruning function to filter unneeded
partitions.

The function will be held by  `FileSystemTableSource`,
`FileSource`,`AbstractFileSource` and `ContinuousFileSplitEnumerator`. At
last, it will be used in
`ContinuousFileSplitEnumerator#processDiscoveredSplits`, the splits
belonging to

unneeded partitions will be dropped here, you can refer to [1].

I think the pruning function will only affect the streaming reading, the
batch

reading will stay the same.


 About `FilterFunction partitionFilter`.


I agree that FilterFunction is enough and the `CatalogPartitionSpec` I

s considered as the input type that will is more meaningful and suitable
than `RowData`,

and I propose we could combine `PartitionSpecToRowData` with `FilterLogic`
in the same function.

I think it is better that we can pass the `FilterFunction` to the connector
sides by invoking `applyPartitionPuringFunction`, and I try to complete a
simple POC[1]. Unfortunately,

I met a blocker about class loading, the code-generated function can not be
passed

from the client to JobMaster, the exception is shown in the UT[2]. Am I
missing something important?


As far as I think, we could not support passing the initialized code
generated function

from client to JobManager or TaskManager, and the
`GeneratedFunction`

is the right choice. But the relevant class about the generated code is
defined

in flink-table-runtime, `SupportsPartitionPushDown` is defined in
flink-table-common,

and the `GeneratedFunction` could be seen as the input type in `
applyPartitionPuringFunction`.



About `applyPartitionPuringFunction` method affects batch/bounded table
> sources


As the code shown, the partition pruning function will only be used in
streaming mode,

and won‘t be called in batch mode. Although we can unify in batch and
streaming mode,

i think it will bring some drawbacks which affect Statics, parallelism
infer.



[1]
https://github.com/apache/flink/compare/master...zoucao:flink:dynamic-partition-pruning

[2]
https://github.com/zoucao/flink/blob/c37d984169c4d099d3ca0458414175168c6e98af/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java#L178

Best regards,

zoucao


Jark Wu  于2022年7月4日周一 20:07写道:

> Hi zoucao,
>
> Regarding the pruning function, maybe a simple filter function is enough,
>  e.g. `FilterFunction partitionFilter`.
>
> Besides, it would be better to state clearly how the new
> `applyPartitionPuringFunction`
> method affects batch/bounded table sources. From my understanding,
> this method won't be called in batch mode?
>
> Best,
> Jark
>
> On Mon, 4 Jul 2022 at 19:40, Martijn Visser 
> wrote:
>
> > Hi zoucao,
> >
> > The FileSource does support streaming reading [1].
> >
> > Best regards,
> >
> > Martijn
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html
> >
> > Op ma 4 jul. 2022 om 05:58 schreef godfrey he :
> >
> > > Hi zoucao,
> > >
> > > Look forward your FLIP.
> > >
> > > >For Batch reading, the 'remainingPartitions' will be seen as the
> > > partitions
> > > >needed to consume, for streaming reading, we use the
> > > >'partitionPruningFunction' to ignore the unneeded partitions.
> > > There should be for bounded source(maybe batch or streaming),
> > > `applyPartitions` should be used,
> > > while only for unbounded source, `applyPartitionPuringFunction` can be
> > > used.
> > >
> > > Best,
> > > Godfrey
> > >
> > > cao zou  于2022年7月4日周一 11:04写道:
> > > >
> > > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> > > could
> > > > you help give me the permission?
> > > > My Id is zoucao, and my mail is zoucao...@gmail.com.
> > > >
> > > > The implications for FileSource
> > > >
> > > > In the above discussion, only HiveSource has been involved, because
> it
> > > > holds a continuous partition fetcher, but FileSource not. If we do
> the
> > > > streaming pruning only in the partition fetcher, it will not affect
> the
> > > > FileSource. If the FileSource supports streaming reading in the
> future,
> > > the
> > > > same changes can be applied to it.
> > > >
> > > > Best regards

Re: [DISCUSS] Support partition pruning for streaming reading

2022-07-03 Thread cao zou
Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and could
you help give me the permission?
My Id is zoucao, and my mail is zoucao...@gmail.com.

The implications for FileSource

In the above discussion, only HiveSource has been involved, because it
holds a continuous partition fetcher, but FileSource not. If we do the
streaming pruning only in the partition fetcher, it will not affect the
FileSource. If the FileSource supports streaming reading in the future, the
same changes can be applied to it.

Best regards,
zoucao

Martijn Visser  于2022年7月1日周五 16:20写道:

> Hi zoucao,
>
> I think this topic deserves a proper FLIP and a vote. This approach is
> focussed only on Hive, but I would also like to understand the implications
> for FileSource. Can you create one?
>
> Best regards,
>
> Martijn
>
> Op wo 22 jun. 2022 om 18:50 schreef cao zou :
>
> > Hi devs, I want to start a discussion to find a way to support partition
> > pruning for streaming reading.
> >
> >
> > Now, Flink has supported the partition pruning, the implementation
> consists
> > of *Source Ability*, *Logical Rule*, and the interface
> > *SupportsPartitionPushDown*, but they all only take effect in batch
> > reading. When reading a table in streaming mode, the existing mechanism
> > will cause some problems posted by FLINK-27898
> > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> > that should be filtered will be sent downstream.
> >
> > To solve this drawback, this discussion is proposed, and the Hive and
> other
> > BigData systems stored with partitions will benefit more from it.
> >
> >  Now, the existing partitions which are needed to consume will be
> generated
> > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> > pushed into TableSource. It’s working well in batch mode, but if we want
> to
> > read records from Hive in streaming mode, and consider the partitions
> > committed in the future, it’s not enough.
> >
> > To support pruning the partitions committed in the feature, the pruning
> > function should be pushed into the TableSource, and then delivered to
> > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > partitions can be invoked here.
> >
> > Before proposing the changes, I think it is necessary to clarify the
> > existing pruning logic. The main logic of the pruning in
> > *PushPartitionIntoTableSourceScanRule* is as follows.
> >
> > Firstly, generating a pruning function called partitionPruner, the
> function
> > is extended from a RichMapFunction.
> >
> >
> > if tableSource.listPartitions() is not empty:
> >   partitions = dynamicTableSource.listPartitions()
> >
> >   for p in partitions:
> > boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> >
> > add p to partitionsAfterPruning where the predicate is true.
> >
> > else  tableSource.listPartitions() is empty:
> >   if the filter can be converted to ResolvedExpression &&
> > the catalog can support the filter :
> >
> > partitionsAfterPruning = catalog.listPartitionsByFilter()
> >
> > the value of partitionsAfterPruning is all needed.
> >   else :
> >
> > partitions = catalog.listPartitions()
> > for p in partitions:
> > boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> >
> >  add p to partitionsAfterPruning where the predicate is true.
> >
> > I think the main logic can be classified into two sides, one exists in
> the
> > logical rule, and the other exists in the connector side. The catalog
> info
> > should be used on the rule side, and not on the connector side, the
> pruning
> > function could be used on both of them or unified on the connector side.
> >
> >
> > Proposed changes
> >
> >
> >- add a new method in SupportsPartitionPushDown
> >- let HiveSourceTable, HiveSourceBuilder, and
> >HiveContinuousPartitionFetcher hold the pruning function.
> >- pruning after fetchPartitions invoked.
> >
> > Considering the version compatibility and the optimization for the method
> > of listing partitions with filter in the catalog, I think we can add a
> new
> > method in *SupportsPartitionPushDown*
> >
> > /**
> > * Provides a list of remaining partitions. After those partitions are
> > applied, a source must
> > * not read the data of other partitions during runtime.
> > *
> > * See the documentation of {@link SupportsPartitionPushDown} for m

[DISCUSS] Support partition pruning for streaming reading

2022-06-22 Thread cao zou
Hi devs, I want to start a discussion to find a way to support partition
pruning for streaming reading.


Now, Flink has supported the partition pruning, the implementation consists
of *Source Ability*, *Logical Rule*, and the interface
*SupportsPartitionPushDown*, but they all only take effect in batch
reading. When reading a table in streaming mode, the existing mechanism
will cause some problems posted by FLINK-27898
[1], and the records
that should be filtered will be sent downstream.

To solve this drawback, this discussion is proposed, and the Hive and other
BigData systems stored with partitions will benefit more from it.

 Now, the existing partitions which are needed to consume will be generated
in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
pushed into TableSource. It’s working well in batch mode, but if we want to
read records from Hive in streaming mode, and consider the partitions
committed in the future, it’s not enough.

To support pruning the partitions committed in the feature, the pruning
function should be pushed into the TableSource, and then delivered to
*ContinuousPartitionFetcher*, such that the pruning for uncommitted
partitions can be invoked here.

Before proposing the changes, I think it is necessary to clarify the
existing pruning logic. The main logic of the pruning in
*PushPartitionIntoTableSourceScanRule* is as follows.

Firstly, generating a pruning function called partitionPruner, the function
is extended from a RichMapFunction.


if tableSource.listPartitions() is not empty:
  partitions = dynamicTableSource.listPartitions()

  for p in partitions:
boolean predicate = partitionPruner.map(convertPartitionToRow(p))

add p to partitionsAfterPruning where the predicate is true.

else  tableSource.listPartitions() is empty:
  if the filter can be converted to ResolvedExpression &&
the catalog can support the filter :

partitionsAfterPruning = catalog.listPartitionsByFilter()

the value of partitionsAfterPruning is all needed.
  else :

partitions = catalog.listPartitions()
for p in partitions:
boolean predicate = partitionPruner.map(convertPartitionToRow(p))

 add p to partitionsAfterPruning where the predicate is true.

I think the main logic can be classified into two sides, one exists in the
logical rule, and the other exists in the connector side. The catalog info
should be used on the rule side, and not on the connector side, the pruning
function could be used on both of them or unified on the connector side.


Proposed changes


   - add a new method in SupportsPartitionPushDown
   - let HiveSourceTable, HiveSourceBuilder, and
   HiveContinuousPartitionFetcher hold the pruning function.
   - pruning after fetchPartitions invoked.

Considering the version compatibility and the optimization for the method
of listing partitions with filter in the catalog, I think we can add a new
method in *SupportsPartitionPushDown*

/**
* Provides a list of remaining partitions. After those partitions are
applied, a source must
* not read the data of other partitions during runtime.
*
* See the documentation of {@link SupportsPartitionPushDown} for more
information.
*/
void applyPartitions(List> remainingPartitions);

/**
* Provides a pruning function for uncommitted partitions.
*/
default void applyPartitionPuringFunction(MapFunction
partitionPruningFunction) { }

We can push the generated function into TableSource, such that the
ContinuousPartitionFetcher can get it.

For Batch reading, the 'remainingPartitions' will be seen as the partitions
needed to consume, for streaming reading, we use the
'partitionPruningFunction' to ignore the unneeded partitions.
Rejected Alternatives

Do not remove the filter logic in Filter Node about the partition keys, if
the source will execute streaming reading.


Looking forward to your opinions.


[1] https://issues.apache.org/jira/browse/FLINK-27898

best

zoucao


Re: [DISCUSS] FLIP-240: Introduce "ANALYZE TABLE" Syntax

2022-06-13 Thread cao zou
Hi godfrey, thanks for your detail explanation.
After explaining and glancing over the FLIP-231, I think it is
really need, +1 for this and looking forward to it.

best
zoucao

godfrey he  于2022年6月13日周一 14:43写道:

> Hi Ingo,
>
> The semantics does not distinguish batch and streaming,
> It works for both batch and streaming, but the result of
> unbounded sources is meaningless.
> Currently, I throw exception for streaming mode,
> and we can support streaming mode with bounded source
> in the future.
>
> Best,
> Godfrey
>
> Ingo Bürk  于2022年6月13日周一 14:17写道:
> >
> > Hi Godfrey,
> >
> > thank you for the explanation. A SELECT is definitely more generic and
> > will work for all connectors automatically. As such I think it's a good
> > baseline solution regardless.
> >
> > We can also think about allowing connector-specific optimizations in the
> > future, but I do like your idea of letting the optimizer rules perform a
> > lot of the work here already by leveraging existing optimizations.
> > Similarly things like non-null counts of non-nullable columns would (or
> > at least could) be handled by the optimizer rules already.
> >
> > So as far as that point goes, +1 to the generic approach.
> >
> > One more point, though: In general we should avoid supporting features
> > only in specific modes as it breaks the unification promise. Given that
> > ANALYZE is a manual and completely optional operation I'm OK with doing
> > that here in principle. However, I wonder what will happen in the
> > streaming / unbounded case. Do you plan to throw an error? Or do we
> > complete the command as successful but without doing anything?
> >
> >
> > Best
> > Ingo
> >
> > On 13.06.22 05:50, godfrey he wrote:
> > > Hi Ingo,
> > >
> > > Thanks for the inputs.
> > >
> > > I think converting `ANALYZE TABLE` to `SELECT` statement is
> > > more generic approach. Because query plan optimization is more generic,
> > >   we can provide more optimization rules to optimize not only `SELECT`
> statement
> > > converted from `ANALYZE TABLE` but also the `SELECT` statement written
> by users.
> > >
> > >> JDBC connector can get a row count estimate without performing a
> > >> SELECT COUNT(1)
> > > To optimize such cases, we can implement a rule to push aggregate into
> > > table source.
> > > Currently, there is a similar rule: SupportsAggregatePushDown, which
> > > supports only pushing
> > > local aggregate into source now.
> > >
> > >
> > > Best,
> > > Godfrey
> > >
> > > Ingo Bürk  于2022年6月10日周五 17:15写道:
> > >>
> > >> Hi Godfrey,
> > >>
> > >> compared to the solution proposed in the FLIP (using a SELECT
> > >> statement), I wonder if you have considered adding APIs to catalogs /
> > >> connectors to perform this task as an alternative?
> > >> I could imagine that for many connectors, statistics could be
> > >> implemented in a less expensive way by leveraging the underlying
> system
> > >> (e.g. a JDBC connector can get a row count estimate without
> performing a
> > >> SELECT COUNT(1)).
> > >>
> > >>
> > >> Best
> > >> Ingo
> > >>
> > >>
> > >> On 10.06.22 09:53, godfrey he wrote:
> > >>> Hi all,
> > >>>
> > >>> I would like to open a discussion on FLIP-240:  Introduce "ANALYZE
> > >>> TABLE" Syntax.
> > >>>
> > >>> As FLIP-231 mentioned, statistics are one of the most important
> inputs
> > >>> to the optimizer. Accurate and complete statistics allows the
> > >>> optimizer to be more powerful. "ANALYZE TABLE" syntax is a very
> common
> > >>> but effective approach to gather statistics, which is already
> > >>> introduced by many compute engines and databases.
> > >>>
> > >>> The main purpose of  discussion is to introduce "ANALYZE TABLE"
> syntax
> > >>> for Flink sql.
> > >>>
> > >>> You can find more details in FLIP-240 document[1]. Looking forward to
> > >>> your feedback.
> > >>>
> > >>> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481
> > >>> [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-240
> > >>>
> > >>>
> > >>> Best,
> > >>> Godfrey
>


Re: [DISCUSS] FLIP-240: Introduce "ANALYZE TABLE" Syntax

2022-06-10 Thread cao zou
Hi godfrey, Thanks for driving this meaningful topic.
I think statistics are essential and meaningful for the optimizer, I'm just
wondering which situation is needed. From the user side, the optimizer
should be executed by the framework, maybe they do not want to consider too
much about it. Could you share more situations about using 'ANALYZE TABLE'
from the user side?

nit: There maybe exists a mistake in Examples#partition table
the partition info should be

Partition1: (ds='2022-06-01', hr=1)

Partition2: (ds='2022-06-01', hr=2)

Partition3: (ds='2022-06-02', hr=1)

Partition4: (ds='2022-06-02', hr=2)

best
 zoucao


godfrey he  于2022年6月10日周五 15:54写道:

> Hi all,
>
> I would like to open a discussion on FLIP-240:  Introduce "ANALYZE
> TABLE" Syntax.
>
> As FLIP-231 mentioned, statistics are one of the most important inputs
> to the optimizer. Accurate and complete statistics allows the
> optimizer to be more powerful. "ANALYZE TABLE" syntax is a very common
> but effective approach to gather statistics, which is already
> introduced by many compute engines and databases.
>
> The main purpose of  discussion is to introduce "ANALYZE TABLE" syntax
> for Flink sql.
>
> You can find more details in FLIP-240 document[1]. Looking forward to
> your feedback.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481
> [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-240
>
>
> Best,
> Godfrey
>


Re: Re: Re: [DISCUSS] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

2022-05-09 Thread cao zou
Hi Xuyang, thanks for your explanation.

For more information about inheritPath and the logic of propagating, you
can see `RelOptUtil`. As the doc says, The hint will be propagated from the
root node to the children nodes. if we have an AST as follows:

LogicalProjectHint1[]
> +- LogicalJoinHint1[0]
>:- LogicalProject
>:  +- LogicalJoin  Hint1[0,0,0]

   : +- LogicalTableScan(table=[[default_catalog, default_database,
> t1]])
>: +- LogicalTableScan(table=[[default_catalog, default_database,
> t2]])
>+- LogicalProject  Hint2[]
>   +- LogicalJoin  Hint1[0,0,0],Hint2[0]

 :- LogicalTableScan(table=[[default_catalog, default_database,
> t3]])
>  +- LogicalTableScan(table=[[default_catalog, default_database,
> t1]])


The child node (t1 join t2) and (t3 join t4) can inherit Hint1 from the
parent node, such that we can not apply the Hint1 to (t1 join t2) and (t3
join t1) in this case, the hint1 should only be applied to ((t1 join t2)
join (t3 join t1)), and hint2 should be applied to (t3 join t1). We can
choose the join node whose hint inheritPath is [0] to apply.
We can use inheritPath to help to determine. But when facing the cascade
join, like:

> select /*+ SHUFFLE_HASH(t2,t4) */ xxx
> from t1
> join t2 on xxx
> join t3 on xxx

join t4 on xxx

Join node with estimated inheritPath:

> LogicalProject Hint[]
> +- LogicalJoin  Hint[0]
>:- LogicalJoin  Hint[0,0]
>:  :- LogicalJoin Hint[0,0,0]
>:  :  :- LogicalTableScan(table=[[default_catalog, default_database,
> t1]])
>:  :  +- LogicalTableScan(table=[[default_catalog, default_database,
> t2]])
>:  +- LogicalTableScan(table=[[default_catalog, default_database, t3]])
>+- LogicalTableScan(table=[[default_catalog, default_database, t4]])

In this situation, all the join nodes will get the hint which has a
different inheritPath, we need to rely on the capability of propagation
because only one hint can be specified at the top join node.
The join node (t1 join t2) from the above two situations all inherit the
hint from their parent nodes, but in situation 1, we should not apply it,
although the join node contains the table t1. In situation 2, we should
apply the hint to (t1 join t2), although his inheritPath is [0,0,0].
I think we need to rely on inherited information to aid in judgment but it
is not easy, please correct me if I have made some mistakes.


For the table alias.
Since we record the view name to help to verify, could we record the
relationship between table name and table alias?

[1]
https://github.com/apache/calcite/blob/b9c2099ea92a575084b55a206efc5dd341c0df62/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L4009

Xuyang  于2022年5月9日周一 15:11写道:

> Thanks for your attention, Li. I agree with you and Cao that in theory
> join hint should support alias just like other popular computing engines
> and dbs. But the main difficult is that currently, calcite will ignore the
> alias when converting SqlNode to RelNode, so the information about alias
> will not be retained. I think it's not a good idea to fix it in Flink
> because this work will change many codes that belong to Calcite and cause
> many copied files from Calcite. We can support this feature in the future
> and temporarily throw an exception just like the behavior that the name of
> the table or view could not be found.
>
>
>
>
> What do you think about this.
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> At 2022-05-09 12:50:28, "Jingsong Li"  wrote:
> >Thanks Xuyang for driving.
> >
> >zoucao also mentioned alias.
> >
> >Can you explain in the FLIP why alias is not supported? What are the
> >difficulties and maybe we can try to overcome them. Or how do we need
> >to report errors if we don't support it.
> >
> >Best,
> >Jingsong
> >
> >On Mon, May 9, 2022 at 10:53 AM Xuyang  wrote:
> >>
> >> Hi, Jark. Thanks for your review. >Join Hint is a public API
> for SQL syntax. It should work for both streaming and batch SQL.I
> agree with your opinion. But currently, only in batch the optimizer has
> different Join strategies for Join and there is no choice of join
> strategies in the stream. The join hints listed in the current flip should
> be ignored (maybe can be warned) in streaming mode.  When in the future the
> stream mode has the choice of join strategies, I think that's a good time
> to discuss that the join hint can affect the streaming
> SQL.>Besides that, could you move your design docs into the
> wiki?Thanks for your reminder, I have moved the content from doc to
> the wiki.
> >> At 2022-05-07 12:46:18, "Jark Wu"  wrote:
> >> >Hi Xuyang,
> >> >
> >> >Thanks for starting this discussion. Join Hint is a long-time requested
> >> >feature.
> >> >I have briefly gone through the design doc. Join Hint is a public API
> for
> >> >SQL syntax.
> >> >It should work for both streaming and batch SQL. I understand some
> special
> >> >hints
> >> >may only work for batch SQL. Could you demonstrate how the hints affect

Re: [DISCUSS] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

2022-05-08 Thread cao zou
Hi Xuyang, thanks for driving this valuable discussion.
I think it makes sense to improve batch processing capability for FlinkSQL.
Using Query hints can make the optimization more flexible and accurate.
After going through the design doc, I have some confusion maybe you can
help to clarify.

Firstly, table alias could be supported in the hint?

If users specify the left table as an alias called t_l and use t_l in the
hint, like /*+ SHUFFLE_HASH(t_l) */, as far as I know, we can not get the
relationship between t_l and the real table name from RelNode, could we
link the hint to the join node correctly?

Secondly, could you demonstrate more about '*Join Hint semantic check', *how
> could we check the table name or view name?

The Query hint will be propagated from the root node to the leaf node, and
the relationship will be recorded in 'inheritPath'. I think the
inherit info should also be used to check, not only using the table name.
What do you think about this?

best
zoucao

Jark Wu  于2022年5月7日周六 12:46写道:

> Hi Xuyang,
>
> Thanks for starting this discussion. Join Hint is a long-time requested
> feature.
> I have briefly gone through the design doc. Join Hint is a public API for
> SQL syntax.
> It should work for both streaming and batch SQL. I understand some special
> hints
> may only work for batch SQL. Could you demonstrate how the hints affect
> stream SQL as well?
>
> Besides that, could you move your design docs into the wiki?
> Google docs are usually used for offline discussion.
> The discussion on google docs is not very visible to the community.
> So we would like to move designs to the wiki and move discussions to the
> mailing list.
>
> Best,
> Jark
>
>
>
>
> On Fri, 6 May 2022 at 11:07, Xuyang  wrote:
>
> > Hi, all.
> > I want to start a discussion about the FLIP-229: Introduces Join Hint
> > for Flink SQL Batch Job(The cwiki[1] is not ready completely but you can
> > see the whole details in docs[2]).
> > Join Hint is a common solution in many popular computing engines and DBs
> > to improve the shortcomings of the optimizer by intervening in optimizing
> > the plan. By Join Hint, users can intervene in the selection of the join
> > strategy in optimizer, and manually optimize the execution plan to
> improve
> > the performance of the query.
> > In this FLIP, we propose some join hints by the existing join
> > strategies in Flink SQL for Batch job.
> > I'm look forward to your feedback about FLIP-229.
> >
> >
> >
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job
> > [2]
> >
> https://docs.google.com/document/d/1IL00ME0Z0nlXGDWTUPODobVQMAm94PAPr9pw9EdGkoQ/edit?usp=sharing
>


[DISCUSS]Support the merge statement in FlinkSQL

2022-02-08 Thread cao zou
Hi, devs!
Jingfeng and I would like to start a discussion about the MERGE statement, and 
the discussion consists of two parts. In the first part, we want to explore and 
collect the cases and motivations of the MERGE statement users. In the second 
part, we want to find out the possibility for Flink SQL to support the merge 
statement.

Before driving the first topic, we want to introduce the definition and 
benefits of the merge statement. The MERGE statement in SQL is a very popular 
clause and it can handle inserts, updates, and deletes all in a single 
transaction without having to write separate logic for each of these. 
For each insert, update, or delete statement, we can specify conditions 
separately. Now, many Engine/DBs have supported this feature, for example, SQL 
Server[1], Spark[2], Hive[3],  pgSQL[4]. 

Our use case: 
Order analysis & processing is one the most important scenario, but sometimes 
updated orders have a long time span compared with the last one with the same 
primary key, in the meanwhile, the states for this key have expired, such that 
the wrong Agg result will be achieved. In this situation, we use the merge 
statement in a batch job to correct the results, and now spark + iceberg is 
chosen in our internal. In the future, we want to unify the batch & streaming 
by using FlinkSQL in our internal, it would be better if Flink could support 
the merge statement. If you have other use cases and opinions, plz show us here.

Now, calcite does not have good support for the merge statement, and there 
exists a Jira CALCITE-4338[5] to track. Could we support the merge statement 
relying on the limited support from calcite-1.26.0? I wrote a simple doc[6] to 
drive this, just want to find out the possibility for Flink SQL to support the 
merge statement.

Looking forward to your feedback, thanks. 

best,
zoucao


[1]https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?redirectedfrom=MSDN&view=sql-server-ver15
 

[2]https://issues.apache.org/jira/browse/SPARK-28893 

[3]https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Merge
 

[4]https://www.postgresql.org/message-id/attachment/23520/sql-merge.html 

[5]https://issues.apache.org/jira/browse/CALCITE-4338 

[6]https://docs.google.com/document/d/12wwCqK6zfWGs84ijFZmGPJqCYfYHUPmfx5CvzUkVrw4/edit?usp=sharing