Share some insights about the new TwoPhaseCatalogTable proposed after offline 
discussion with Mang. 
The main or important reason is that the TwoPhaseCatalogTable enables external 
connectors to implement theirs own logic for commit / abort. 
In FLIP-218, for atomic CTAS, the Catalog will then just drop the table when 
the job fail. It's not ideal for it's too generic to work well. 
For example, some connectors will need to clean some temporary files in abort 
method. And the actual connector can know the specific logic for aborting. 

Best regards, 
Yuxia 


发件人: "zhangmang1" <zhangma...@163.com> 
收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com> 
抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" <lincoln.8...@gmail.com>, 
luoyu...@alumni.sjtu.edu.cn 
发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 
主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) 
statement 

hi, Jing 
Thank you for your reply. 
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as 
>described in FLIP-218. Did I understand correctly? 
Yes, when I was implementing the FLIP-218 solution, I encountered problems with 
Catalog/CatalogTable serialization deserialization, for example, after 
deserialization CatalogTable could not be converted to Hive Table. Also, 
Catalog serialization is still a heavy operation, but it may not actually be 
necessary, we just need Create Table. 
Therefore, the TwoPhaseCatalogTable program is proposed, which also facilitates 
the implementation of the subsequent data lake, ReplaceTable and other 
functions. 

>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here? 
Here's what I think about this issue, atomic CTAS wants to be the default 
behavior and only fall back to non-atomic CTAS if it's completely unattainable. 
Atomic CTAS will bring a better experience to users. 
Flink is already a stream batch unified engine, In our company kwai, many users 
are also using flink to do batch data processing, but still running in Stream 
mode. 
The boundary between stream and batch is gradually blurred, stream mode jobs 
may also FINISH, so I added the isStreamingMode parameter, this provides 
different atomicity implementations in Batch and Stream modes. 
Not only to determine if atomicity is supported, but also to help select 
different TwoPhaseCatalogTable implementations to provide different levels of 
atomicity! 

Looking forward to more feedback. 









-- 
Best regards, 
Mang Zhang 




At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote:
>Hi Mang,
>
>This is the FLIP I was looking forward to after FLIP-218. Thanks for
>driving it. I have two questions and would like to know your thoughts,
>thanks:
>
>1. It looks like you found another way to design the atomic CTAS with new
>serializable TwoPhaseCatalogTable instead of making Catalog serializable as
>described in FLIP-218. Did I understand correctly?
>2. I am a little bit confused about the isStreamingMode parameter of
>Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>smell) we should commonly avoid in the public interface. According to the
>FLIP,  isStreamingMode will be used by the Catalog to determine whether to
>support atomic or not. With this selector argument, there will be two
>different logics built within one method and it is hard to follow without
>reading the code or the doc carefully(another concern is to keep the doc
>and code alway be consistent) i.e. sometimes there will be no difference by
>using true/false isStreamingMode, sometimes they are quite different -
>atomic vs. non-atomic. Another question is, before we call
>Catalog#twoPhaseCreateTable(...), we have to know the value of
>isStreamingMode. In case only non-atomic is supported for streaming mode,
>we could just follow FLIP-218 instead of (twistedly) calling
>Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>anything here?
>
>Best regards,
>Jing
>
>On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:
>
>> Hi, Mang.
>> +1 for completing the support for atomicity of CTAS, this is very useful
>> in batch scenarios and integrate with the data lake which support
>> transcation.
>>
>> I just have one question, IIUC, the DynamiacTableSink will need to know
>> it's for normal case or the atomicity with CTAS as well as neccessary
>> context.
>> Take jdbc catalog as an example, if it's CTAS with atomicity supports, the
>> jdbc DynamiacTableSink will write the temp table defined in the
>> TwoPhaseCatalogTable which is different from normal case.
>>
>> How can the DynamiacTableSink can get it? Could you give some explanation
>> or example in this FLIP?
>>
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "zhangmang1" <zhangma...@163.com>
>> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>,
>> "lincoln 86xy" <lincoln.8...@gmail.com>
>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> SELECT(CTAS) statement
>>
>> Hi, Lincoln and Ron
>>
>>
>> Thank you for your reply.
>> On the naming wise I think OK, the future expansion of new features more
>> uniform. I have updated the FLIP.
>>
>>
>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and can
>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> tables with speculative execution 3. writing Hive table with small file
>> merge
>>
>>
>> The main purpose of FLIP-305 is to implement support for CTAS atomicity in
>> the Flink framework,
>> so I only poc to verify the first scenario of writing to the Hive table,
>> and we can subsequently split the sub-task to support the other two
>> scenarios.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >+1 for completing the support for atomicity of CTAS, this is very useful
>> in
>> >batch scenarios.
>> >
>> >I have two questions:
>> >1. naming wise:
>> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >`Catalog#twoPhaseCreateTable` (and we may add
>> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >`TwoPhaseCatalogTable`?
>> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word 'transaction'
>> >in the method name, which may remind users of the relevance of transaction
>> >support (however, it is not strictly so), so I suggest changing it to
>> >`begin`
>> >2. Has this design been validated by any relevant Poc on hive or other
>> >catalogs?
>> >
>> >Best,
>> >Lincoln Lee
>> >
>> >
>> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
>> >
>> >> Hi, Mang
>> >> Atomicity is very important for CTAS, especially for batch jobs. This
>> FLIP
>> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> I just have one question, in the Motivation part of FLIP-218, we
>> mentioned
>> >> three levels of atomicity semantics, can this current design do the
>> same as
>> >> Spark's DataSource V2, which can guarantee both atomicity and isolation,
>> >> for example, can it be done by writing to Hive tables using CTAS?
>> >>
>> >> Best,
>> >> Ron
>> >>
>> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
>> >>
>> >> > Hi, everyone
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> CREATE
>> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's not
>> >> > atomic. It will create the table first before job running. If the job
>> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > So I want Flink to support atomic CTAS, where only the table is
>> created
>> >> > when the Job succeeds. Improve user experience.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > Looking forward to your feedback.
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> >
>> >> > Best regards,
>> >> > Mang Zhang
>> >>
>> 

Reply via email to