Thank you for the great work, Mang! The updated proposal looks good to me. 

Best,
Jark

> 2023年6月8日 11:49,Jingsong Li <jingsongl...@gmail.com> 写道:
> 
> Thanks Mang for updating!
> 
> Looks good to me!
> 
> Best,
> Jingsong
> 
> On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zhangma...@163.com> wrote:
>> 
>> Hi Jingsong,
>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>> On this issue, we introduce the executable logic commit/abort a bit of 
>> strange on CatalogTable.
>> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
>> The new solution is similar to the implementation of SupportsOverwrite,
>> which introduces the SupportsStaging interface and infers whether 
>> DynamicTableSink supports atomic ctas based on whether it implements the 
>> SupportsStaging interface,
>> and if so, it will get the StagedTable object from DynamicTableSink.
>> 
>> For more implementation details, please see the FLIP-305 document.
>> 
>> This is my poc commits 
>> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>> 
>> 
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> 
>> 
>> --
>> 
>> Best regards,
>> 
>> Mang Zhang
>> 
>> 
>> 
>> At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote:
>>> Hi Mang,
>>> 
>>> Thanks for starting this FLIP.
>>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>>> 
>>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>>> 
>>> Best,
>>> Jingsong
>>> 
>>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote:
>>>> 
>>>> Hi Ron,
>>>> 
>>>> 
>>>> First of all, thank you for your reply!
>>>> After our offline communication, what you said is mainly in the 
>>>> compilePlan scenario, but currently compilePlanSql does not support non 
>>>> INSERT statements, otherwise it will throw an exception.
>>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL 
>>>>> statement of type INSERT
>>>> But it's a good point that I will seriously consider.
>>>> Non-atomic CTAS can be supported relatively easily;
>>>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is 
>>>> and follow up with a separate issue to implement CTAS support for 
>>>> compilePlanSql.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> 
>>>> Best regards,
>>>> Mang Zhang
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote:
>>>>> Hi, Mang
>>>>> 
>>>>> I have a question about the implementation details. For the atomicity 
>>>>> case,
>>>>> since the target table is not created before the JobGraph is generated, 
>>>>> but
>>>>> then the target table is required to exist when optimizing plan to 
>>>>> generate
>>>>> the JobGraph. So how do you solve this problem?
>>>>> 
>>>>> Best,
>>>>> Ron
>>>>> 
>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>>>>> 
>>>>>> Share some insights about the new TwoPhaseCatalogTable proposed after
>>>>>> offline discussion with Mang.
>>>>>> The main or important reason is that the TwoPhaseCatalogTable enables
>>>>>> external connectors to implement theirs own logic for commit / abort.
>>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>>>>>> when the job fail. It's not ideal for it's too generic to work well.
>>>>>> For example, some connectors will need to clean some temporary files in
>>>>>> abort method. And the actual connector can know the specific logic for
>>>>>> aborting.
>>>>>> 
>>>>>> Best regards,
>>>>>> Yuxia
>>>>>> 
>>>>>> 
>>>>>> 发件人: "zhangmang1" <zhangma...@163.com>
>>>>>> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com>
>>>>>> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" <
>>>>>> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn
>>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>>>>>> SELECT(CTAS) statement
>>>>>> 
>>>>>> hi, Jing
>>>>>> Thank you for your reply.
>>>>>>> 1. It looks like you found another way to design the atomic CTAS with 
>>>>>>> new
>>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable
>>>>>> as
>>>>>>> described in FLIP-218. Did I understand correctly?
>>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered 
>>>>>> problems
>>>>>> with Catalog/CatalogTable serialization deserialization, for example, 
>>>>>> after
>>>>>> deserialization CatalogTable could not be converted to Hive Table. Also,
>>>>>> Catalog serialization is still a heavy operation, but it may not actually
>>>>>> be necessary, we just need Create Table.
>>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>>>>>> facilitates the implementation of the subsequent data lake, ReplaceTable
>>>>>> and other functions.
>>>>>> 
>>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of
>>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>>>>>>> smell) we should commonly avoid in the public interface. According to 
>>>>>>> the
>>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine whether 
>>>>>>> to
>>>>>>> support atomic or not. With this selector argument, there will be two
>>>>>>> different logics built within one method and it is hard to follow 
>>>>>>> without
>>>>>>> reading the code or the doc carefully(another concern is to keep the doc
>>>>>>> and code alway be consistent) i.e. sometimes there will be no difference
>>>>>> by
>>>>>>> using true/false isStreamingMode, sometimes they are quite different -
>>>>>>> atomic vs. non-atomic. Another question is, before we call
>>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>>>>>>> isStreamingMode. In case only non-atomic is supported for streaming 
>>>>>>> mode,
>>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I 
>>>>>>> miss
>>>>>>> anything here?
>>>>>> Here's what I think about this issue, atomic CTAS wants to be the default
>>>>>> behavior and only fall back to non-atomic CTAS if it's completely
>>>>>> unattainable. Atomic CTAS will bring a better experience to users.
>>>>>> Flink is already a stream batch unified engine, In our company kwai, many
>>>>>> users are also using flink to do batch data processing, but still running
>>>>>> in Stream mode.
>>>>>> The boundary between stream and batch is gradually blurred, stream mode
>>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
>>>>>> provides different atomicity implementations in Batch and Stream modes.
>>>>>> Not only to determine if atomicity is supported, but also to help select
>>>>>> different TwoPhaseCatalogTable implementations to provide different 
>>>>>> levels
>>>>>> of atomicity!
>>>>>> 
>>>>>> Looking forward to more feedback.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Best regards,
>>>>>> Mang Zhang
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote:
>>>>>>> Hi Mang,
>>>>>>> 
>>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks for
>>>>>>> driving it. I have two questions and would like to know your thoughts,
>>>>>>> thanks:
>>>>>>> 
>>>>>>> 1. It looks like you found another way to design the atomic CTAS with 
>>>>>>> new
>>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable
>>>>>> as
>>>>>>> described in FLIP-218. Did I understand correctly?
>>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of
>>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>>>>>>> smell) we should commonly avoid in the public interface. According to 
>>>>>>> the
>>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine whether 
>>>>>>> to
>>>>>>> support atomic or not. With this selector argument, there will be two
>>>>>>> different logics built within one method and it is hard to follow 
>>>>>>> without
>>>>>>> reading the code or the doc carefully(another concern is to keep the doc
>>>>>>> and code alway be consistent) i.e. sometimes there will be no difference
>>>>>> by
>>>>>>> using true/false isStreamingMode, sometimes they are quite different -
>>>>>>> atomic vs. non-atomic. Another question is, before we call
>>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
>>>>>>> isStreamingMode. In case only non-atomic is supported for streaming 
>>>>>>> mode,
>>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
>>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I 
>>>>>>> miss
>>>>>>> anything here?
>>>>>>> 
>>>>>>> Best regards,
>>>>>>> Jing
>>>>>>> 
>>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi, Mang.
>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very 
>>>>>>>> useful
>>>>>>>> in batch scenarios and integrate with the data lake which support
>>>>>>>> transcation.
>>>>>>>> 
>>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need to know
>>>>>>>> it's for normal case or the atomicity with CTAS as well as neccessary
>>>>>>>> context.
>>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>>>>>> the
>>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
>>>>>>>> TwoPhaseCatalogTable which is different from normal case.
>>>>>>>> 
>>>>>>>> How can the DynamiacTableSink can get it? Could you give some
>>>>>> explanation
>>>>>>>> or example in this FLIP?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Yuxia
>>>>>>>> 
>>>>>>>> ----- 原始邮件 -----
>>>>>>>> 发件人: "zhangmang1" <zhangma...@163.com>
>>>>>>>> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>,
>>>>>>>> "lincoln 86xy" <lincoln.8...@gmail.com>
>>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>>>>>>>> SELECT(CTAS) statement
>>>>>>>> 
>>>>>>>> Hi, Lincoln and Ron
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thank you for your reply.
>>>>>>>> On the naming wise I think OK, the future expansion of new features 
>>>>>>>> more
>>>>>>>> uniform. I have updated the FLIP.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>>>>>> can
>>>>>>>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>>>>>>>> tables with speculative execution 3. writing Hive table with small file
>>>>>>>> merge
>>>>>>>> 
>>>>>>>> 
>>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>>>>>> in
>>>>>>>> the Flink framework,
>>>>>>>> so I only poc to verify the first scenario of writing to the Hive 
>>>>>>>> table,
>>>>>>>> and we can subsequently split the sub-task to support the other two
>>>>>>>> scenarios.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> 
>>>>>>>> Best regards,
>>>>>>>> Mang Zhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
>>>>>>>>> Hi, Mang
>>>>>>>>> 
>>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
>>>>>> useful
>>>>>>>> in
>>>>>>>>> batch scenarios.
>>>>>>>>> 
>>>>>>>>> I have two questions:
>>>>>>>>> 1. naming wise:
>>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
>>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>>>>>>>>> `TwoPhaseCatalogTable`?
>>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>>>>>> 'transaction'
>>>>>>>>> in the method name, which may remind users of the relevance of
>>>>>> transaction
>>>>>>>>> support (however, it is not strictly so), so I suggest changing it to
>>>>>>>>> `begin`
>>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or other
>>>>>>>>> catalogs?
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Lincoln Lee
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
>>>>>>>>> 
>>>>>>>>>> Hi, Mang
>>>>>>>>>> Atomicity is very important for CTAS, especially for batch jobs. This
>>>>>>>> FLIP
>>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
>>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
>>>>>>>> mentioned
>>>>>>>>>> three levels of atomicity semantics, can this current design do the
>>>>>>>> same as
>>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
>>>>>> isolation,
>>>>>>>>>> for example, can it be done by writing to Hive tables using CTAS?
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Ron
>>>>>>>>>> 
>>>>>>>>>> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi, everyone
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic for
>>>>>>>> CREATE
>>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>>>>>> not
>>>>>>>>>>> atomic. It will create the table first before job running. If the
>>>>>> job
>>>>>>>>>>> execution fails, or is cancelled, the table will not be dropped.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
>>>>>>>> created
>>>>>>>>>>> when the Job succeeds. Improve user experience.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> 
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Mang Zhang
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 

Reply via email to