[RESULT][VOTE] FLIP-399: Flink Connector Doris

2024-04-26 Thread wudi
Hi everyone,

I'm happy to announce that FLIP-399: Flink Connector Doris [1]
has been accepted with 16 approving votes, 5 of which are binding: [2]:

- Yuepeng Pan (non-binding)
- Ahmed Hamdy (non-binding)
- Ferenc Csaky (non-binding)
- Robert Metzger (binding)
- Muhammet Orazov (non-binding)
- Leonard Xu (binding)
- Jiabao Sun(binding)
- gongzhongqiang (non-binding)
- Feng Jin (non-binding)
- Jing Ge (binding)
- Martijn Visser (binding)
- Hang Ruan (non-binding)
- Samrat Deb (non-binding)
- Yanquan Lv (non-binding)
- Aleksandr Pilipenko (non-binding)
- Samrat Deb (non-binding)

There are no disapproving votes. Thanks to everyone who participated in the
discussion and voting.

Best,
Di.Wu

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
[2] https://lists.apache.org/thread/w3hoglk0pqbzqhzlfcgzkkz3xrwo90rt


Re: [VOTE] FLIP-399: Flink Connector Doris

2024-04-26 Thread wudi
Hi all, 
Thank you all!  Closing the vote. 
The result will be announced in a separate email. 

Best regards
Di.Wu
  

> 2024年4月15日 16:48,Samrat Deb  写道:
> 
> +1 (non binding)
> 
> On Mon, 15 Apr 2024 at 2:16 PM, Aleksandr Pilipenko 
> wrote:
> 
>> +1 (non-binding),
>> 
>> Best,
>> Aleksandr
>> 
>> On Mon, 15 Apr 2024 at 03:03, Yanquan Lv  wrote:
>> 
>>> +1 (non-binding), thanks for it.
>>> 
>>> wudi <676366...@qq.com.invalid> 于2024年4月9日周二 10:48写道:
>>> 
>>>> Hi devs,
>>>> 
>>>> I would like to start a vote about FLIP-399 [1]. The FLIP is about
>>>> contributing the Flink Doris Connector[2] to the Flink community.
>>>> Discussion thread [3].
>>>> 
>>>> The vote will be open for at least 72 hours unless there is an
>> objection
>>> or
>>>> insufficient votes.
>>>> 
>>>> 
>>>> Thanks,
>>>> Di.Wu
>>>> 
>>>> 
>>>> [1]
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
>>>> [2] https://github.com/apache/doris-flink-connector
>>>> [3] https://lists.apache.org/thread/p3z4wsw3ftdyfs9p2wd7bbr2gfyl3xnh
>>>> 
>>>> 
>>> 
>> 



[VOTE] FLIP-399: Flink Connector Doris

2024-04-08 Thread wudi
Hi devs,

I would like to start a vote about FLIP-399 [1]. The FLIP is about contributing 
the Flink Doris Connector[2] to the Flink community. Discussion thread [3].

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.


Thanks,
Di.Wu


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
[2] https://github.com/apache/doris-flink-connector
[3] https://lists.apache.org/thread/p3z4wsw3ftdyfs9p2wd7bbr2gfyl3xnh



Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-04-06 Thread wudi
Hi,

since there are no more comments for a while, if there are no more comments for 
another day, I will start a vote thread.

Thanks,
di.wu


> 2024年4月1日 17:52,wudi <676366...@qq.com> 写道:
> 
> Hi,
> 
> Gentle ping to see if there are any other concerns or things that seems 
> missing from the FLIP.
> 
> Brs
> di.wu
> 
>> 2024年3月25日 17:52,Feng Jin  写道:
>> 
>> Hi Di
>> 
>> Thank you for your patience and explanation.
>> 
>> If this is a server-side configuration, we currently cannot modify it in
>> the client configuration. If Doris supports client-side configuration in
>> the future, we can reconsider whether to support it.
>> 
>> I currently have no other questions regarding this FLIP.  LGTM.
>> 
>> 
>> Best,
>> Feng
>> 
>> On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote:
>> 
>>> Hi, Feng
>>> 
>>> Yes, if the StreamLoad transaction timeout is very short, you may
>>> encounter this situation.
>>> 
>>> The timeout for StreamLoad transactions is controlled by the
>>> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the
>>> default value is 12 hours. Currently, it is a global transaction
>>> configuration and cannot be set separately for a specific transaction.
>>> 
>>> However, I understand the default 12-hour timeout should cover most cases
>>> unless you are restarting from a checkpoint that occurred a long time ago.
>>> What do you think?
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168
>>> 
>>> 
>>> Brs
>>> di.wu
>>> 
>>>> 2024年3月25日 11:45,Feng Jin  写道:
>>>> 
>>>> Hi Di
>>>> 
>>>> Thanks for your reply.
>>>> 
>>>> The timeout I'm referring to here is not the commit timeout, but rather
>>> the
>>>> timeout for a single streamLoad transaction.
>>>> 
>>>> Let's say we have set the transaction timeout for StreamLoad to be 10
>>>> minutes. Now, imagine there is a Flink job with two subtasks. Due to
>>>> significant data skew and backpressure issues, subtask 0 and subtask 1
>>> are
>>>> processing at different speeds. Subtask 0 finishes processing this
>>>> checkpoint first, while subtask 1 takes another 10 minutes to complete
>>> its
>>>> processing. At this point, the job's checkpoint is done. However, since
>>>> subtask 0 has been waiting for subtask 1 all along, its corresponding
>>>> streamLoad transaction closes after more than 10 minutes have passed - by
>>>> which time the server has already cleaned up this transaction, leading
>>> to a
>>>> failed commit.
>>>> Therefore, I would like to know if in such situations we can avoid this
>>>> problem by setting a longer lifespan for transactions.
>>>> 
>>>> 
>>>> Best,
>>>> Feng
>>>> 
>>>> 
>>>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
>>>> 
>>>>> Hi, Feng,
>>>>> 
>>>>> 1. Are you suggesting that when a commit gets stuck, we can interrupt
>>> the
>>>>> commit request using a timeout parameter? Currently, there is no such
>>>>> parameter. In my understanding, in a two-phase commit, checkpoint must
>>> be
>>>>> enabled, so the commit timeout is essentially the checkpoint timeout.
>>>>> Therefore, it seems unnecessary to add an additional parameter here.
>>> What
>>>>> do you think?
>>>>> 
>>>>> 2. In addition to deleting checkpoints to re-consume data again, the
>>>>> Connector also provides an option to ignore commit errors[1]. However,
>>> this
>>>>> option is only used for error recovery scenarios, such as when a
>>>>> transaction is cleared by the server but you want to reuse the upstream
>>>>> offset from the checkpoint.
>>>>> 
>>>>> 3. Also, thank you for pointing out the issue with the parameter. It has
>>>>> already been addressed[2], but the FLIP changes were overlooked. It has
>>>>> been updated.
>>>>> 
>>>>> [1]
>>>>> 
>>> https://github.com/apache/doris-flink-connector/blob/mas

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-04-01 Thread wudi
Hi,

Gentle ping to see if there are any other concerns or things that seems missing 
from the FLIP.

Brs
di.wu

> 2024年3月25日 17:52,Feng Jin  写道:
> 
> Hi Di
> 
> Thank you for your patience and explanation.
> 
> If this is a server-side configuration, we currently cannot modify it in
> the client configuration. If Doris supports client-side configuration in
> the future, we can reconsider whether to support it.
> 
> I currently have no other questions regarding this FLIP.  LGTM.
> 
> 
> Best,
> Feng
> 
> On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng
>> 
>> Yes, if the StreamLoad transaction timeout is very short, you may
>> encounter this situation.
>> 
>> The timeout for StreamLoad transactions is controlled by the
>> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the
>> default value is 12 hours. Currently, it is a global transaction
>> configuration and cannot be set separately for a specific transaction.
>> 
>> However, I understand the default 12-hour timeout should cover most cases
>> unless you are restarting from a checkpoint that occurred a long time ago.
>> What do you think?
>> 
>> 
>> [1]
>> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168
>> 
>> 
>> Brs
>> di.wu
>> 
>>> 2024年3月25日 11:45,Feng Jin  写道:
>>> 
>>> Hi Di
>>> 
>>> Thanks for your reply.
>>> 
>>> The timeout I'm referring to here is not the commit timeout, but rather
>> the
>>> timeout for a single streamLoad transaction.
>>> 
>>> Let's say we have set the transaction timeout for StreamLoad to be 10
>>> minutes. Now, imagine there is a Flink job with two subtasks. Due to
>>> significant data skew and backpressure issues, subtask 0 and subtask 1
>> are
>>> processing at different speeds. Subtask 0 finishes processing this
>>> checkpoint first, while subtask 1 takes another 10 minutes to complete
>> its
>>> processing. At this point, the job's checkpoint is done. However, since
>>> subtask 0 has been waiting for subtask 1 all along, its corresponding
>>> streamLoad transaction closes after more than 10 minutes have passed - by
>>> which time the server has already cleaned up this transaction, leading
>> to a
>>> failed commit.
>>> Therefore, I would like to know if in such situations we can avoid this
>>> problem by setting a longer lifespan for transactions.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> 
>>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
>>> 
>>>> Hi, Feng,
>>>> 
>>>> 1. Are you suggesting that when a commit gets stuck, we can interrupt
>> the
>>>> commit request using a timeout parameter? Currently, there is no such
>>>> parameter. In my understanding, in a two-phase commit, checkpoint must
>> be
>>>> enabled, so the commit timeout is essentially the checkpoint timeout.
>>>> Therefore, it seems unnecessary to add an additional parameter here.
>> What
>>>> do you think?
>>>> 
>>>> 2. In addition to deleting checkpoints to re-consume data again, the
>>>> Connector also provides an option to ignore commit errors[1]. However,
>> this
>>>> option is only used for error recovery scenarios, such as when a
>>>> transaction is cleared by the server but you want to reuse the upstream
>>>> offset from the checkpoint.
>>>> 
>>>> 3. Also, thank you for pointing out the issue with the parameter. It has
>>>> already been addressed[2], but the FLIP changes were overlooked. It has
>>>> been updated.
>>>> 
>>>> [1]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
>>>> [2]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
>>>> 
>>>> Brs
>>>> di.wu
>>>> 
>>>> 
>>>> 
>>>>> 2024年3月22日 18:28,Feng Jin  写道:
>>>>> 
>>>>> Hi Di,
>>>>> 
>>>>> Thank you for the update, as well as quickly implementing corresponding
>>>>> capabil

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-25 Thread wudi
Hi, Feng

Yes, if the StreamLoad transaction timeout is very short, you may encounter 
this situation.

The timeout for StreamLoad transactions is controlled by the 
streaming_label_keep_max_second parameter [1] in FE (Frontend), and the default 
value is 12 hours. Currently, it is a global transaction configuration and 
cannot be set separately for a specific transaction.

However, I understand the default 12-hour timeout should cover most cases 
unless you are restarting from a checkpoint that occurred a long time ago. What 
do you think?


[1] 
https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168


Brs
di.wu

> 2024年3月25日 11:45,Feng Jin  写道:
> 
> Hi Di
> 
> Thanks for your reply.
> 
> The timeout I'm referring to here is not the commit timeout, but rather the
> timeout for a single streamLoad transaction.
> 
> Let's say we have set the transaction timeout for StreamLoad to be 10
> minutes. Now, imagine there is a Flink job with two subtasks. Due to
> significant data skew and backpressure issues, subtask 0 and subtask 1 are
> processing at different speeds. Subtask 0 finishes processing this
> checkpoint first, while subtask 1 takes another 10 minutes to complete its
> processing. At this point, the job's checkpoint is done. However, since
> subtask 0 has been waiting for subtask 1 all along, its corresponding
> streamLoad transaction closes after more than 10 minutes have passed - by
> which time the server has already cleaned up this transaction, leading to a
> failed commit.
> Therefore, I would like to know if in such situations we can avoid this
> problem by setting a longer lifespan for transactions.
> 
> 
> Best,
> Feng
> 
> 
> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng,
>> 
>> 1. Are you suggesting that when a commit gets stuck, we can interrupt the
>> commit request using a timeout parameter? Currently, there is no such
>> parameter. In my understanding, in a two-phase commit, checkpoint must be
>> enabled, so the commit timeout is essentially the checkpoint timeout.
>> Therefore, it seems unnecessary to add an additional parameter here. What
>> do you think?
>> 
>> 2. In addition to deleting checkpoints to re-consume data again, the
>> Connector also provides an option to ignore commit errors[1]. However, this
>> option is only used for error recovery scenarios, such as when a
>> transaction is cleared by the server but you want to reuse the upstream
>> offset from the checkpoint.
>> 
>> 3. Also, thank you for pointing out the issue with the parameter. It has
>> already been addressed[2], but the FLIP changes were overlooked. It has
>> been updated.
>> 
>> [1]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
>> [2]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
>> 
>> Brs
>> di.wu
>> 
>> 
>> 
>>> 2024年3月22日 18:28,Feng Jin  写道:
>>> 
>>> Hi Di,
>>> 
>>> Thank you for the update, as well as quickly implementing corresponding
>>> capabilities including filter push down and project push down.
>>> 
>>> Regarding the transaction timeout, I still have some doubts. I would like
>>> to confirm if we can control this timeout parameter in the connector,
>> such
>>> as setting it to 10 minutes or 1 hour.
>>> Also, when a transaction is cleared by the server, the commit operation
>> of
>>> the connector will fail, leading to job failure. In this case, can users
>>> only choose to delete the checkpoint and re-consume historical data?
>>> 
>>> There is also a small question regarding the parameters*: *
>>> *doris.request.connect.timeout.ms <
>> http://doris.request.connect.timeout.ms>*
>>> and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms
>>> *,
>>> can we change them to Duration type and remove the "ms" suffix.?
>>> This way, all time parameters can be kept uniform in type as duration.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
>>> 
>>>> Hi, Feng,
>>>> Thank you, that's a great suggestion !
>>>> 
>>>> I have already implemented FilterPushDown and removed that parameter on
>>

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-22 Thread wudi
Hi, Feng,

1. Are you suggesting that when a commit gets stuck, we can interrupt the 
commit request using a timeout parameter? Currently, there is no such 
parameter. In my understanding, in a two-phase commit, checkpoint must be 
enabled, so the commit timeout is essentially the checkpoint timeout. 
Therefore, it seems unnecessary to add an additional parameter here. What do 
you think?

2. In addition to deleting checkpoints to re-consume data again, the Connector 
also provides an option to ignore commit errors[1]. However, this option is 
only used for error recovery scenarios, such as when a transaction is cleared 
by the server but you want to reuse the upstream offset from the checkpoint.

3. Also, thank you for pointing out the issue with the parameter. It has 
already been addressed[2], but the FLIP changes were overlooked. It has been 
updated.

[1] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
[2] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98

Brs
di.wu



> 2024年3月22日 18:28,Feng Jin  写道:
> 
> Hi Di,
> 
> Thank you for the update, as well as quickly implementing corresponding
> capabilities including filter push down and project push down.
> 
> Regarding the transaction timeout, I still have some doubts. I would like
> to confirm if we can control this timeout parameter in the connector, such
> as setting it to 10 minutes or 1 hour.
> Also, when a transaction is cleared by the server, the commit operation of
> the connector will fail, leading to job failure. In this case, can users
> only choose to delete the checkpoint and re-consume historical data?
> 
> There is also a small question regarding the parameters*: *
> *doris.request.connect.timeout.ms <http://doris.request.connect.timeout.ms>*
> and d*oris.request.read.timeout.ms <http://oris.request.read.timeout.ms>*,
> can we change them to Duration type and remove the "ms" suffix.?
> This way, all time parameters can be kept uniform in type as duration.
> 
> 
> Best,
> Feng
> 
> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng,
>> Thank you, that's a great suggestion !
>> 
>> I have already implemented FilterPushDown and removed that parameter on
>> DorisDynamicTableSource[1], and also updated FLIP.
>> 
>> Regarding the mention of [Doris also aborts transactions], it may not have
>> been described accurately. It mainly refers to the automatic expiration of
>> long-running transactions in Doris that have not been committed for a
>> prolonged period.
>> 
>> As for two-phase commit, when a commit fails, the checkpoint will also
>> fail, and the job will be continuously retried.
>> 
>> [1]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
>> 
>> Brs
>> di.wu
>> 
>> 
>>> 2024年3月15日 14:53,Feng Jin  写道:
>>> 
>>> Hi Di
>>> 
>>> Thank you for initiating this FLIP, +1 for this.
>>> 
>>> Regarding the option `doris.filter.query` of doris source table
>>> 
>>> Can we directly implement the FilterPushDown capability of Flink Source
>>> like Jdbc Source [1] instead of introducing an option?
>>> 
>>> 
>>> Regarding two-phase commit,
>>> 
>>>> At the same time, Doris will also abort transactions that have not been
>>> committed for a long time
>>> 
>>> Can we control the transaction timeout in the connector?
>>> And control the behavior when timeout occurs, whether to discard by
>> default
>>> or trigger job failure?
>>> 
>>> 
>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
>>> 
>>> Best,
>>> Feng
>>> 
>>> 
>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky >> 
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Thanks for driving this, +1 for the FLIP.
>>>> 
>>>> Best,
>>>> Ferenc
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy >> 
>>>> wrote:
>>>> 
>>>>> 
>>>>> 
>>>>> Hello,
>>>>> Thanks for the proposal, +1 for the FLIP.
>>>>> 
>>>>> Best Regards
>>>>> Ahmed Hamdy
>>>&g

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-22 Thread wudi
Hi, Feng,
Thank you, that's a great suggestion ! 

I have already implemented FilterPushDown and removed that parameter on 
DorisDynamicTableSource[1], and also updated FLIP.

Regarding the mention of [Doris also aborts transactions], it may not have been 
described accurately. It mainly refers to the automatic expiration of 
long-running transactions in Doris that have not been committed for a prolonged 
period.

As for two-phase commit, when a commit fails, the checkpoint will also fail, 
and the job will be continuously retried.

[1] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58

Brs
di.wu


> 2024年3月15日 14:53,Feng Jin  写道:
> 
> Hi Di
> 
> Thank you for initiating this FLIP, +1 for this.
> 
> Regarding the option `doris.filter.query` of doris source table
> 
> Can we directly implement the FilterPushDown capability of Flink Source
> like Jdbc Source [1] instead of introducing an option?
> 
> 
> Regarding two-phase commit,
> 
>> At the same time, Doris will also abort transactions that have not been
> committed for a long time
> 
> Can we control the transaction timeout in the connector?
> And control the behavior when timeout occurs, whether to discard by default
> or trigger job failure?
> 
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-16024
> 
> Best,
> Feng
> 
> 
> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky 
> wrote:
> 
>> Hi,
>> 
>> Thanks for driving this, +1 for the FLIP.
>> 
>> Best,
>> Ferenc
>> 
>> 
>> 
>> 
>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy 
>> wrote:
>> 
>>> 
>>> 
>>> Hello,
>>> Thanks for the proposal, +1 for the FLIP.
>>> 
>>> Best Regards
>>> Ahmed Hamdy
>>> 
>>> 
>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
>>> 
>>>> Hi, Leonard
>>>> Thank you for your suggestion.
>>>> I referred to other Connectors[1], modified the naming and types of
>>>> relevant parameters[2], and also updated FLIP.
>>>> 
>>>> [1]
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
>>>> [1]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
>>>> 
>>>> Brs,
>>>> di.wu
>>>> 
>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
>>>>> 
>>>>> Thanks wudi for the updating, the FLIP generally looks good to me, I
>>>>> only left two minor suggestions:
>>>>> 
>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s
>> looks
>>>>> strange to me, could we change all time interval related option
>> value type
>>>>> to Duration ?
>>>>> 
>>>>> (2) Could you check and improve all config options like
>>>>> `doris.exec.mem.limit` to make them to follow flink config option
>> naming
>>>>> and value type?
>>>>> 
>>>>> Best,
>>>>> Leonard
>>>>> 
>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
>>>>>>> 
>>>>>>> Hi Di,
>>>>>>> 
>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to
>> know
>>>>>>> your
>>>>>>> thoughts about the following questions:
>>>>>>> 
>>>>>>> 1. According to your clarification of the exactly-once, thanks
>> for it
>>>>>>> BTW,
>>>>>>> no PreCommitTopology is required. Does it make sense to let
>>>>>>> DorisSink[1]
>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
>>>>>>> deprecated[2] before turning the Doris connector into a Flink
>>>>>>> connector?
>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a
>> data
>>>>>>> pipeline
>>>>>>> to support further e.g. ad-hoc query or cube with feasible
>>>>>>> pre-aggregation.
>>>>>>> Just out of curiosity, would you like to share some real use
>> cases that
>>>>>>> will use OLAP engines as the source of a streaming data
>> p

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-11 Thread wudi
Hi, Leonard
Thank you for your suggestion.
I referred to other Connectors[1], modified the naming and types of relevant 
parameters[2], and also updated FLIP.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
[1] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java

Brs,
di.wu

> 2024年3月7日 14:33,Leonard Xu  写道:
> 
> Thanks wudi for the updating, the FLIP generally looks good to me, I only 
> left two minor suggestions:
> 
> (1) The suffix `.s` in configoption doris.request.query.timeout.s looks 
> strange to me, could we change all time interval related option value type to 
> Duration ?
> 
> (2) Could you check and improve all config options  like 
> `doris.exec.mem.limit` to make them to follow flink config option naming and 
> value type?
> 
> Best,
> Leonard
> 
> 
>> 
>> 
>>> 2024年3月6日 06:12,Jing Ge  写道:
>>> 
>>> Hi Di,
>>> 
>>> Thanks for your proposal. +1 for the contribution. I'd like to know your
>>> thoughts about the following questions:
>>> 
>>> 1. According to your clarification of the exactly-once, thanks for it BTW,
>>> no PreCommitTopology is required. Does it make sense to let DorisSink[1]
>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
>>> deprecated[2] before turning the Doris connector into a Flink connector?
>>> 2. OLAP engines are commonly used as the tail/downstream of a data pipeline
>>> to support further e.g. ad-hoc query or cube with feasible pre-aggregation.
>>> Just out of curiosity, would you like to share some real use cases that
>>> will use OLAP engines as the source of a streaming data pipeline? Or it
>>> will only be used as the source for the batch?
>>> 3. The E2E test only covered sink[3], if I am not mistaken. Would you like
>>> to test the source in E2E too?
>>> 
>>> [1]
>>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
>>> [3]
>>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> On Tue, Mar 5, 2024 at 11:18 AM wudi <676366...@qq.com.invalid> wrote:
>>> 
>>>> Hi, Jeyhun Karimov.
>>>> Thanks for your question.
>>>> 
>>>> - How to ensure Exactly-Once?
>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger the
>>>> precommit api of StreamLoad to complete the persistence of data in Doris
>>>> (the data will not be visible at this time), and will also pass this TxnID
>>>> to the Committer.
>>>> 2. When this Checkpoint of the entire Job is completed, the Committer will
>>>> call the commit api of StreamLoad and commit TxnID to complete the
>>>> visibility of the transaction.
>>>> 3. When the task is restarted, the Txn with successful precommit and
>>>> failed commit will be aborted based on the label-prefix, and Doris' abort
>>>> API will be called. (At the same time, Doris will also abort transactions
>>>> that have not been committed for a long time)
>>>> 
>>>> ps: At the same time, this part of the content has been updated in FLIP
>>>> 
>>>> - Because the default table model in Doris is Duplicate (
>>>> https://doris.apache.org/docs/data-table/data-model/), which does not
>>>> have a primary key, batch writing may cause data duplication, but UNIQ The
>>>> model has a primary key, which ensures the idempotence of writing, thus
>>>> achieving Exactly-Once
>>>> 
>>>> Brs,
>>>> di.wu
>>>> 
>>>> 
>>>>> 2024年3月2日 17:50,Jeyhun Karimov  写道:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Thanks for the proposal. +1 for the FLIP.
>>>>> I have a few questions:
>>>>> 
>>>>> - How exactly the two (Stream Load's two-phase commit and Flink's
>>>> two-phase
>>>>> commit) combination will ensure the e2e exactly-once semantics?
>>>>> 
>>>>> - T

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-06 Thread wudi
Hi Jing Ge, thanks for your suggestions.

1. Currently, the Flink Doris Connector is compatible with Flink versions 
1.15-1.18. SupportsCommitter[1] seems to be introduced in Flink 1.19, and most 
users may not have upgraded their Flink environments to that version yet. 
Modifying it now could lead to incompatibilities. I think we can postpone the 
modification and make it together with other connectors. What do you think?

2. Yes, currently DorisSource only supports batch reading, typically used for 
data synchronization and ETL. Streaming reading is not supported yet, which 
requires the capability of Doris Binlog (mentioned in the Doris 2024 
RoadMap[2]). Streaming reading can be used to capture incremental events from 
the database, making it more convenient for users to process real-time data 
newly added to Doris.

3. E2ECase[3] for DorisSource have been added, and the TestPlan in the FLIP has 
been modified.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
[2] https://github.com/apache/doris/issues/30669
[3] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DorisDorisE2ECase.java

Brs,
di.wu


> 2024年3月6日 06:12,Jing Ge  写道:
> 
> Hi Di,
> 
> Thanks for your proposal. +1 for the contribution. I'd like to know your
> thoughts about the following questions:
> 
> 1. According to your clarification of the exactly-once, thanks for it BTW,
> no PreCommitTopology is required. Does it make sense to let DorisSink[1]
> implement SupportsCommitter, since the TwoPhaseCommittingSink is
> deprecated[2] before turning the Doris connector into a Flink connector?
> 2. OLAP engines are commonly used as the tail/downstream of a data pipeline
> to support further e.g. ad-hoc query or cube with feasible pre-aggregation.
> Just out of curiosity, would you like to share some real use cases that
> will use OLAP engines as the source of a streaming data pipeline? Or it
> will only be used as the source for the batch?
> 3. The E2E test only covered sink[3], if I am not mistaken. Would you like
> to test the source in E2E too?
> 
> [1]
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
> [3]
> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
> 
> Best regards,
> Jing
> 
> On Tue, Mar 5, 2024 at 11:18 AM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Jeyhun Karimov.
>> Thanks for your question.
>> 
>> - How to ensure Exactly-Once?
>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger the
>> precommit api of StreamLoad to complete the persistence of data in Doris
>> (the data will not be visible at this time), and will also pass this TxnID
>> to the Committer.
>> 2. When this Checkpoint of the entire Job is completed, the Committer will
>> call the commit api of StreamLoad and commit TxnID to complete the
>> visibility of the transaction.
>> 3. When the task is restarted, the Txn with successful precommit and
>> failed commit will be aborted based on the label-prefix, and Doris' abort
>> API will be called. (At the same time, Doris will also abort transactions
>> that have not been committed for a long time)
>> 
>> ps: At the same time, this part of the content has been updated in FLIP
>> 
>> - Because the default table model in Doris is Duplicate (
>> https://doris.apache.org/docs/data-table/data-model/), which does not
>> have a primary key, batch writing may cause data duplication, but UNIQ The
>> model has a primary key, which ensures the idempotence of writing, thus
>> achieving Exactly-Once
>> 
>> Brs,
>> di.wu
>> 
>> 
>>> 2024年3月2日 17:50,Jeyhun Karimov  写道:
>>> 
>>> Hi,
>>> 
>>> Thanks for the proposal. +1 for the FLIP.
>>> I have a few questions:
>>> 
>>> - How exactly the two (Stream Load's two-phase commit and Flink's
>> two-phase
>>> commit) combination will ensure the e2e exactly-once semantics?
>>> 
>>> - The FLIP proposes to combine Doris's batch writing with the primary key
>>> table to achieve Exactly-Once semantics. Could you elaborate more on
>> that?
>>> Why it is not the default behavior but a workaround?
>>> 
>>> Regards,
>>> 

Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-05 Thread wudi
Hi, Jeyhun Karimov.
Thanks for your question.

- How to ensure Exactly-Once?
1. When the Checkpoint Barrier arrives, DorisSink will trigger the precommit 
api of StreamLoad to complete the persistence of data in Doris (the data will 
not be visible at this time), and will also pass this TxnID to the Committer.
2. When this Checkpoint of the entire Job is completed, the Committer will call 
the commit api of StreamLoad and commit TxnID to complete the visibility of the 
transaction.
3. When the task is restarted, the Txn with successful precommit and failed 
commit will be aborted based on the label-prefix, and Doris' abort API will be 
called. (At the same time, Doris will also abort transactions that have not 
been committed for a long time)

ps: At the same time, this part of the content has been updated in FLIP

- Because the default table model in Doris is Duplicate 
(https://doris.apache.org/docs/data-table/data-model/), which does not have a 
primary key, batch writing may cause data duplication, but UNIQ The model has a 
primary key, which ensures the idempotence of writing, thus achieving 
Exactly-Once

Brs,
di.wu


> 2024年3月2日 17:50,Jeyhun Karimov  写道:
> 
> Hi,
> 
> Thanks for the proposal. +1 for the FLIP.
> I have a few questions:
> 
> - How exactly the two (Stream Load's two-phase commit and Flink's two-phase
> commit) combination will ensure the e2e exactly-once semantics?
> 
> - The FLIP proposes to combine Doris's batch writing with the primary key
> table to achieve Exactly-Once semantics. Could you elaborate more on that?
> Why it is not the default behavior but a workaround?
> 
> Regards,
> Jeyhun
> 
> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv  wrote:
> 
>> Thanks for driving this.
>> The content is very detailed, it is recommended to add a section on Test
>> Plan for more completeness.
>> 
>> Di Wu  于2024年1月25日周四 15:40写道:
>> 
>>> Hi all,
>>> 
>>> Previously, we had some discussions about contributing Flink Doris
>>> Connector to the Flink community [1]. I want to further promote this
>> work.
>>> I hope everyone will help participate in this FLIP discussion and provide
>>> more valuable opinions and suggestions.
>>> Thanks.
>>> 
>>> [1] https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>> 
>>> Brs,
>>> di.wu
>>> 
>>> 
>>> 
>>> On 2023/12/07 05:02:46 wudi wrote:
>>>> 
>>>> Hi all,
>>>> 
>>>> As discussed in the previous email [1], about contributing the Flink
>>> Doris Connector to the Flink community.
>>>> 
>>>> 
>>>> Apache Doris[2] is a high-performance, real-time analytical database
>>> based on MPP architecture, for scenarios where Flink is used for data
>>> analysis, processing, or real-time writing on Doris, Flink Doris
>> Connector
>>> is an effective tool.
>>>> 
>>>> At the same time, Contributing Flink Doris Connector to the Flink
>>> community will further expand the Flink Connectors ecosystem.
>>>> 
>>>> So I would like to start an official discussion FLIP-399: Flink
>>> Connector Doris[3].
>>>> 
>>>> Looking forward to comments, feedbacks and suggestions from the
>>> community on the proposal.
>>>> 
>>>> [1] https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>>> [2]
>> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
>>>> [3]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
>>>> 
>>>> 
>>>> Brs,
>>>> 
>>>> di.wu
>>>> 
>>> 
>> 



Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-05 Thread wudi
Thanks, Yanquan, Test Plan has been added at the end of FLIP[1].

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris

Brs,
di.wu


> 2024年3月2日 17:13,Yanquan Lv  写道:
> 
> Thanks for driving this.
> The content is very detailed, it is recommended to add a section on Test
> Plan for more completeness.
> 
> Di Wu  于2024年1月25日周四 15:40写道:
> 
>> Hi all,
>> 
>> Previously, we had some discussions about contributing Flink Doris
>> Connector to the Flink community [1]. I want to further promote this work.
>> I hope everyone will help participate in this FLIP discussion and provide
>> more valuable opinions and suggestions.
>> Thanks.
>> 
>> [1] https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>> 
>> Brs,
>> di.wu
>> 
>> 
>> 
>> On 2023/12/07 05:02:46 wudi wrote:
>>> 
>>> Hi all,
>>> 
>>> As discussed in the previous email [1], about contributing the Flink
>> Doris Connector to the Flink community.
>>> 
>>> 
>>> Apache Doris[2] is a high-performance, real-time analytical database
>> based on MPP architecture, for scenarios where Flink is used for data
>> analysis, processing, or real-time writing on Doris, Flink Doris Connector
>> is an effective tool.
>>> 
>>> At the same time, Contributing Flink Doris Connector to the Flink
>> community will further expand the Flink Connectors ecosystem.
>>> 
>>> So I would like to start an official discussion FLIP-399: Flink
>> Connector Doris[3].
>>> 
>>> Looking forward to comments, feedbacks and suggestions from the
>> community on the proposal.
>>> 
>>> [1] https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>> [2] https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
>>> [3]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
>>> 
>>> 
>>> Brs,
>>> 
>>> di.wu
>>> 
>> 



Re: [PROPOSAL] Contribute Flink CDC Connectors project to Apache Flink

2023-12-10 Thread wudi
Awesome, +1

Brs,

di.wu

> 2023年12月7日 上午11:24,Leonard Xu  写道:
> 
> Dear Flink devs,
> 
> As you may have heard, we at Alibaba (Ververica) are planning to donate CDC 
> Connectors for the Apache Flink project[1] to the Apache Flink community.
> 
> CDC Connectors for Apache Flink comprise a collection of source connectors 
> designed specifically for Apache Flink. These connectors[2] enable the 
> ingestion of changes from various databases using Change Data Capture (CDC), 
> most of these CDC connectors are powered by Debezium[3]. They support both 
> the DataStream API and the Table/SQL API, facilitating the reading of 
> database snapshots and continuous reading of transaction logs with 
> exactly-once processing, even in the event of failures.
> 
> 
> Additionally, in the latest version 3.0, we have introduced many long-awaited 
> features. Starting from CDC version 3.0, we've built a Streaming ELT 
> Framework available for streaming data integration. This framework allows 
> users to write their data synchronization logic in a simple YAML file, which 
> will automatically be translated into a Flink DataStreaming job. It 
> emphasizes optimizing the task submission process and offers advanced 
> functionalities such as whole database synchronization, merging sharded 
> tables, and schema evolution[4].
> 
> 
> I believe this initiative is a perfect match for both sides. For the Flink 
> community, it presents an opportunity to enhance Flink's competitive 
> advantage in streaming data integration, promoting the healthy growth and 
> prosperity of the Apache Flink ecosystem. For the CDC Connectors project, 
> becoming a sub-project of Apache Flink means being part of a neutral 
> open-source community, which can attract a more diverse pool of contributors.
> 
> Please note that the aforementioned points represent only some of our 
> motivations and vision for this donation. Specific future operations need to 
> be further discussed in this thread. For example, the sub-project name after 
> the donation; we hope to name it Flink-CDC aiming to streaming data 
> intergration through Apache Flink, following the naming convention of 
> Flink-ML; And this project is managed by a total of 8 maintainers, including 
> 3 Flink PMC members and 1 Flink Committer. The remaining 4 maintainers are 
> also highly active contributors to the Flink community, donating this project 
> to the Flink community implies that their permissions might be reduced. 
> Therefore, we may need to bring up this topic for further discussion within 
> the Flink PMC. Additionally, we need to discuss how to migrate existing users 
> and documents. We have a user group of nearly 10,000 people and a 
> multi-version documentation site need to migrate. We also need to plan for 
> the migration of CI/CD processes and other specifics. 
> 
> 
> While there are many intricate details that require implementation, we are 
> committed to progressing and finalizing this donation process.
> 
> 
> Despite being Flink’s most active ecological project (as evaluated by GitHub 
> metrics), it also boasts a significant user base. However, I believe it's 
> essential to commence discussions on future operations only after the 
> community reaches a consensus on whether they desire this donation.
> 
> 
> Really looking forward to hear what you think! 
> 
> 
> Best,
> Leonard (on behalf of the Flink CDC Connectors project maintainers)
> 
> [1] https://github.com/ververica/flink-cdc-connectors
> [2] 
> https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html
> [3] https://debezium.io
> [4] 
> https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-pipeline.html



[DISCUSS] FLIP-399: Flink Connector Doris

2023-12-06 Thread wudi


Hi all,

As discussed in the previous email [1], about contributing the Flink Doris 
Connector to the Flink community.


Apache Doris[2] is a high-performance, real-time analytical database based on 
MPP architecture, for scenarios where Flink is used for data analysis, 
processing, or real-time writing on Doris, Flink Doris Connector is an 
effective tool.

At the same time, Contributing Flink Doris Connector to the Flink community 
will further expand the Flink Connectors ecosystem.

So I would like to start an official discussion FLIP-399: Flink Connector 
Doris[3].

Looking forward to comments, feedbacks and suggestions from the community on 
the proposal.

[1] https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
[2] https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris


Brs,

di.wu


Re: [DISCUSS] Contribute Flink Doris Connector to the Flink community

2023-11-30 Thread wudi
Thank you everyone. But I encountered a problem when creating FLIP. There is no 
permission to create files in the Flink Improvement Proposals [1] space. I may 
need PMC to help me add permissions: My Jira account is Di Wu The email is 
d...@apache.org Thanks  [1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 
<https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>


--

Brs,

di.wu


> 2023年11月27日 下午1:22,Jing Ge  写道:
> 
> That sounds great! +1
> 
> Best regards
> Jing
> 
> On Mon, Nov 27, 2023 at 3:38 AM Leonard Xu  wrote:
> 
>> Thanks wudi for kicking off the discussion,
>> 
>> +1 for the idea from my side.
>> 
>> A FLIP like Yun posted is required if no other objections.
>> 
>> Best,
>> Leonard
>> 
>>> 2023年11月26日 下午6:22,wudi <676366...@qq.com.INVALID> 写道:
>>> 
>>> Hi all,
>>> 
>>> At present, Flink Connector and Flink's repository have been
>> decoupled[1].
>>> At the same time, the Flink-Doris-Connector[3] has been maintained based
>> on the Apache Doris[2] community.
>>> I think the Flink Doris Connector can be migrated to the Flink community
>> because it It is part of Flink Connectors and can also expand the ecosystem
>> of Flink Connectors.
>>> 
>>> I volunteer to move this forward if I can.
>>> 
>>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
>>> [2] https://doris.apache.org/
>>> [3] https://github.com/apache/doris-flink-connector
>>> 
>>> --
>>> 
>>> Brs,
>>> di.wu
>> 
>> 



[DISCUSS] Contribute Flink Doris Connector to the Flink community

2023-11-26 Thread wudi
Hi all,

At present, Flink Connector and Flink's repository have been decoupled[1]. 
At the same time, the Flink-Doris-Connector[3] has been maintained based on the 
Apache Doris[2] community.
I think the Flink Doris Connector can be migrated to the Flink community 
because it It is part of Flink Connectors and can also expand the ecosystem of 
Flink Connectors.

I volunteer to move this forward if I can.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
[2] https://doris.apache.org/
[3] https://github.com/apache/doris-flink-connector

--

Brs,
di.wu