Hi,

Gentle ping to see if there are any other concerns or things that seems missing 
from the FLIP.

Brs
di.wu

> 2024年3月25日 17:52,Feng Jin <jinfeng1...@gmail.com> 写道:
> 
> Hi Di
> 
> Thank you for your patience and explanation.
> 
> If this is a server-side configuration, we currently cannot modify it in
> the client configuration. If Doris supports client-side configuration in
> the future, we can reconsider whether to support it.
> 
> I currently have no other questions regarding this FLIP.  LGTM.
> 
> 
> Best,
> Feng
> 
> On Mon, Mar 25, 2024 at 3:42 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng
>> 
>> Yes, if the StreamLoad transaction timeout is very short, you may
>> encounter this situation.
>> 
>> The timeout for StreamLoad transactions is controlled by the
>> streaming_label_keep_max_second parameter [1] in FE (Frontend), and the
>> default value is 12 hours. Currently, it is a global transaction
>> configuration and cannot be set separately for a specific transaction.
>> 
>> However, I understand the default 12-hour timeout should cover most cases
>> unless you are restarting from a checkpoint that occurred a long time ago.
>> What do you think?
>> 
>> 
>> [1]
>> https://github.com/apache/doris/blob/master/fe/fe-common/src/main/java/org/apache/doris/common/Config.java#L163-L168
>> 
>> 
>> Brs
>> di.wu
>> 
>>> 2024年3月25日 11:45,Feng Jin <jinfeng1...@gmail.com> 写道:
>>> 
>>> Hi Di
>>> 
>>> Thanks for your reply.
>>> 
>>> The timeout I'm referring to here is not the commit timeout, but rather
>> the
>>> timeout for a single streamLoad transaction.
>>> 
>>> Let's say we have set the transaction timeout for StreamLoad to be 10
>>> minutes. Now, imagine there is a Flink job with two subtasks. Due to
>>> significant data skew and backpressure issues, subtask 0 and subtask 1
>> are
>>> processing at different speeds. Subtask 0 finishes processing this
>>> checkpoint first, while subtask 1 takes another 10 minutes to complete
>> its
>>> processing. At this point, the job's checkpoint is done. However, since
>>> subtask 0 has been waiting for subtask 1 all along, its corresponding
>>> streamLoad transaction closes after more than 10 minutes have passed - by
>>> which time the server has already cleaned up this transaction, leading
>> to a
>>> failed commit.
>>> Therefore, I would like to know if in such situations we can avoid this
>>> problem by setting a longer lifespan for transactions.
>>> 
>>> 
>>> Best,
>>> Feng
>>> 
>>> 
>>> On Fri, Mar 22, 2024 at 10:24 PM wudi <676366...@qq.com.invalid> wrote:
>>> 
>>>> Hi, Feng,
>>>> 
>>>> 1. Are you suggesting that when a commit gets stuck, we can interrupt
>> the
>>>> commit request using a timeout parameter? Currently, there is no such
>>>> parameter. In my understanding, in a two-phase commit, checkpoint must
>> be
>>>> enabled, so the commit timeout is essentially the checkpoint timeout.
>>>> Therefore, it seems unnecessary to add an additional parameter here.
>> What
>>>> do you think?
>>>> 
>>>> 2. In addition to deleting checkpoints to re-consume data again, the
>>>> Connector also provides an option to ignore commit errors[1]. However,
>> this
>>>> option is only used for error recovery scenarios, such as when a
>>>> transaction is cleared by the server but you want to reuse the upstream
>>>> offset from the checkpoint.
>>>> 
>>>> 3. Also, thank you for pointing out the issue with the parameter. It has
>>>> already been addressed[2], but the FLIP changes were overlooked. It has
>>>> been updated.
>>>> 
>>>> [1]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
>>>> [2]
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98
>>>> 
>>>> Brs
>>>> di.wu
>>>> 
>>>> 
>>>> 
>>>>> 2024年3月22日 18:28,Feng Jin <jinfeng1...@gmail.com> 写道:
>>>>> 
>>>>> Hi Di,
>>>>> 
>>>>> Thank you for the update, as well as quickly implementing corresponding
>>>>> capabilities including filter push down and project push down.
>>>>> 
>>>>> Regarding the transaction timeout, I still have some doubts. I would
>> like
>>>>> to confirm if we can control this timeout parameter in the connector,
>>>> such
>>>>> as setting it to 10 minutes or 1 hour.
>>>>> Also, when a transaction is cleared by the server, the commit operation
>>>> of
>>>>> the connector will fail, leading to job failure. In this case, can
>> users
>>>>> only choose to delete the checkpoint and re-consume historical data?
>>>>> 
>>>>> There is also a small question regarding the parameters*: *
>>>>> *doris.request.connect.timeout.ms <
>>>> http://doris.request.connect.timeout.ms>*
>>>>> and d*oris.request.read.timeout.ms <
>> http://oris.request.read.timeout.ms
>>>>> *,
>>>>> can we change them to Duration type and remove the "ms" suffix.?
>>>>> This way, all time parameters can be kept uniform in type as duration.
>>>>> 
>>>>> 
>>>>> Best,
>>>>> Feng
>>>>> 
>>>>> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
>>>>> 
>>>>>> Hi, Feng,
>>>>>> Thank you, that's a great suggestion !
>>>>>> 
>>>>>> I have already implemented FilterPushDown and removed that parameter
>> on
>>>>>> DorisDynamicTableSource[1], and also updated FLIP.
>>>>>> 
>>>>>> Regarding the mention of [Doris also aborts transactions], it may not
>>>> have
>>>>>> been described accurately. It mainly refers to the automatic
>> expiration
>>>> of
>>>>>> long-running transactions in Doris that have not been committed for a
>>>>>> prolonged period.
>>>>>> 
>>>>>> As for two-phase commit, when a commit fails, the checkpoint will also
>>>>>> fail, and the job will be continuously retried.
>>>>>> 
>>>>>> [1]
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
>>>>>> 
>>>>>> Brs
>>>>>> di.wu
>>>>>> 
>>>>>> 
>>>>>>> 2024年3月15日 14:53,Feng Jin <jinfeng1...@gmail.com> 写道:
>>>>>>> 
>>>>>>> Hi Di
>>>>>>> 
>>>>>>> Thank you for initiating this FLIP, +1 for this.
>>>>>>> 
>>>>>>> Regarding the option `doris.filter.query` of doris source table
>>>>>>> 
>>>>>>> Can we directly implement the FilterPushDown capability of Flink
>> Source
>>>>>>> like Jdbc Source [1] instead of introducing an option?
>>>>>>> 
>>>>>>> 
>>>>>>> Regarding two-phase commit,
>>>>>>> 
>>>>>>>> At the same time, Doris will also abort transactions that have not
>>>> been
>>>>>>> committed for a long time
>>>>>>> 
>>>>>>> Can we control the transaction timeout in the connector?
>>>>>>> And control the behavior when timeout occurs, whether to discard by
>>>>>> default
>>>>>>> or trigger job failure?
>>>>>>> 
>>>>>>> 
>>>>>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
>>>>>>> 
>>>>>>> Best,
>>>>>>> Feng
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky
>>>> <ferenc.cs...@pm.me.invalid
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Thanks for driving this, +1 for the FLIP.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Ferenc
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy <
>>>> hamdy10...@gmail.com
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> Thanks for the proposal, +1 for the FLIP.
>>>>>>>>> 
>>>>>>>>> Best Regards
>>>>>>>>> Ahmed Hamdy
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
>>>>>>>>> 
>>>>>>>>>> Hi, Leonard
>>>>>>>>>> Thank you for your suggestion.
>>>>>>>>>> I referred to other Connectors[1], modified the naming and types
>> of
>>>>>>>>>> relevant parameters[2], and also updated FLIP.
>>>>>>>>>> 
>>>>>>>>>> [1]
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
>>>>>>>>>> [1]
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
>>>>>>>>>> 
>>>>>>>>>> Brs,
>>>>>>>>>> di.wu
>>>>>>>>>> 
>>>>>>>>>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks wudi for the updating, the FLIP generally looks good to
>> me,
>>>> I
>>>>>>>>>>> only left two minor suggestions:
>>>>>>>>>>> 
>>>>>>>>>>> (1) The suffix `.s` in configoption doris.request.query.timeout.s
>>>>>>>> looks
>>>>>>>>>>> strange to me, could we change all time interval related option
>>>>>>>> value type
>>>>>>>>>>> to Duration ?
>>>>>>>>>>> 
>>>>>>>>>>> (2) Could you check and improve all config options like
>>>>>>>>>>> `doris.exec.mem.limit` to make them to follow flink config option
>>>>>>>> naming
>>>>>>>>>>> and value type?
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Leonard
>>>>>>>>>>> 
>>>>>>>>>>>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Di,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for your proposal. +1 for the contribution. I'd like to
>>>>>>>> know
>>>>>>>>>>>>> your
>>>>>>>>>>>>> thoughts about the following questions:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. According to your clarification of the exactly-once, thanks
>>>>>>>> for it
>>>>>>>>>>>>> BTW,
>>>>>>>>>>>>> no PreCommitTopology is required. Does it make sense to let
>>>>>>>>>>>>> DorisSink[1]
>>>>>>>>>>>>> implement SupportsCommitter, since the TwoPhaseCommittingSink
>> is
>>>>>>>>>>>>> deprecated[2] before turning the Doris connector into a Flink
>>>>>>>>>>>>> connector?
>>>>>>>>>>>>> 2. OLAP engines are commonly used as the tail/downstream of a
>>>>>>>> data
>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>> to support further e.g. ad-hoc query or cube with feasible
>>>>>>>>>>>>> pre-aggregation.
>>>>>>>>>>>>> Just out of curiosity, would you like to share some real use
>>>>>>>> cases that
>>>>>>>>>>>>> will use OLAP engines as the source of a streaming data
>>>>>>>> pipeline? Or it
>>>>>>>>>>>>> will only be used as the source for the batch?
>>>>>>>>>>>>> 3. The E2E test only covered sink[3], if I am not mistaken.
>>>>>>>> Would you
>>>>>>>>>>>>> like
>>>>>>>>>>>>> to test the source in E2E too?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> [1]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
>>>>>>>>>> 
>>>>>>>>>>>>> [2]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
>>>>>>>>>> 
>>>>>>>>>>>>> [3]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
>>>>>>>>>> 
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Jing
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid
>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi, Jeyhun Karimov.
>>>>>>>>>>>>>> Thanks for your question.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - How to ensure Exactly-Once?
>>>>>>>>>>>>>> 1. When the Checkpoint Barrier arrives, DorisSink will trigger
>>>>>>>> the
>>>>>>>>>>>>>> precommit api of StreamLoad to complete the persistence of
>>>>>>>> data in
>>>>>>>>>>>>>> Doris
>>>>>>>>>>>>>> (the data will not be visible at this time), and will also
>>>>>>>> pass this
>>>>>>>>>>>>>> TxnID
>>>>>>>>>>>>>> to the Committer.
>>>>>>>>>>>>>> 2. When this Checkpoint of the entire Job is completed, the
>>>>>>>> Committer
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> call the commit api of StreamLoad and commit TxnID to complete
>>>>>>>> the
>>>>>>>>>>>>>> visibility of the transaction.
>>>>>>>>>>>>>> 3. When the task is restarted, the Txn with successful
>>>>>>>> precommit and
>>>>>>>>>>>>>> failed commit will be aborted based on the label-prefix, and
>>>>>>>> Doris'
>>>>>>>>>>>>>> abort
>>>>>>>>>>>>>> API will be called. (At the same time, Doris will also abort
>>>>>>>>>>>>>> transactions
>>>>>>>>>>>>>> that have not been committed for a long time)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ps: At the same time, this part of the content has been
>>>>>>>> updated in
>>>>>>>>>>>>>> FLIP
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - Because the default table model in Doris is Duplicate (
>>>>>>>>>>>>>> https://doris.apache.org/docs/data-table/data-model/), which
>>>>>>>> does not
>>>>>>>>>>>>>> have a primary key, batch writing may cause data duplication,
>>>>>>>> but
>>>>>>>>>>>>>> UNIQ The
>>>>>>>>>>>>>> model has a primary key, which ensures the idempotence of
>>>>>>>> writing,
>>>>>>>>>>>>>> thus
>>>>>>>>>>>>>> achieving Exactly-Once
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Brs,
>>>>>>>>>>>>>> di.wu
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2024年3月2日 17:50,Jeyhun Karimov je.kari...@gmail.com 写道:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the proposal. +1 for the FLIP.
>>>>>>>>>>>>>>> I have a few questions:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - How exactly the two (Stream Load's two-phase commit and
>>>>>>>> Flink's
>>>>>>>>>>>>>>> two-phase
>>>>>>>>>>>>>>> commit) combination will ensure the e2e exactly-once
>>>>>>>> semantics?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - The FLIP proposes to combine Doris's batch writing with the
>>>>>>>>>>>>>>> primary key
>>>>>>>>>>>>>>> table to achieve Exactly-Once semantics. Could you elaborate
>>>>>>>> more on
>>>>>>>>>>>>>>> that?
>>>>>>>>>>>>>>> Why it is not the default behavior but a workaround?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Sat, Mar 2, 2024 at 10:14 AM Yanquan Lv
>>>>>>>> decq12y...@gmail.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks for driving this.
>>>>>>>>>>>>>>>> The content is very detailed, it is recommended to add a
>>>>>>>> section on
>>>>>>>>>>>>>>>> Test
>>>>>>>>>>>>>>>> Plan for more completeness.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Di Wu d...@apache.org 于2024年1月25日周四 15:40写道:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Previously, we had some discussions about contributing
>>>>>>>> Flink Doris
>>>>>>>>>>>>>>>>> Connector to the Flink community [1]. I want to further
>>>>>>>> promote
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> work.
>>>>>>>>>>>>>>>>> I hope everyone will help participate in this FLIP
>>>>>>>> discussion and
>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>> more valuable opinions and suggestions.
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> 
>>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Brs,
>>>>>>>>>>>>>>>>> di.wu
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 2023/12/07 05:02:46 wudi wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> As discussed in the previous email [1], about
>>>>>>>> contributing the
>>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>> Doris Connector to the Flink community.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Apache Doris[2] is a high-performance, real-time
>>>>>>>> analytical
>>>>>>>>>>>>>>>>>> database
>>>>>>>>>>>>>>>>>> based on MPP architecture, for scenarios where Flink
>>>>>>>> is used for
>>>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>> analysis, processing, or real-time writing on Doris,
>>>>>>>> Flink Doris
>>>>>>>>>>>>>>>>>> Connector
>>>>>>>>>>>>>>>>>> is an effective tool.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> At the same time, Contributing Flink Doris Connector
>>>>>>>> to the Flink
>>>>>>>>>>>>>>>>>> community will further expand the Flink Connectors
>>>>>>>> ecosystem.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> So I would like to start an official discussion
>>>>>>>> FLIP-399: Flink
>>>>>>>>>>>>>>>>>> Connector Doris[3].
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Looking forward to comments, feedbacks and suggestions
>>>>>>>> from the
>>>>>>>>>>>>>>>>>> community on the proposal.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> 
>>>>>>>> https://lists.apache.org/thread/lvh8g9o6qj8bt3oh60q81z0o1cv3nn8p
>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>> 
>>>>>>>>>> 
>>>> https://doris.apache.org/docs/dev/get-starting/what-is-apache-doris/
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> [3]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-399%3A+Flink+Connector+Doris
>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Brs,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> di.wu
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to