Hi Jingsong,

Thank you for your reply!
We introduced `TwoPhaseCatalogTable` for two reasons:
1. The `TwoPhaseCatalogTable` of different data sources can have more 
operations, if through Catalog, there can only be simple create table and drop 
table, not flexible enough; For example, deleting a temporary directory, or 
using rename table in a relational database to implement atomic semantics in 
flink;
2. Facilitate subsequent extensions, such as support for replace table, 
extended data lake storage support;
>And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
Regarding naming, at first, use `StagedCatalogTable`, but after offline 
discussions with yuxia and Lincoln, we think there is already 
TwoPhaseCommittingSink/TwoPhaseCommitSinkFunction in Flink, in order to keep 
the naming unity so change to `TwoPhaseCatalogTable`.







--

Best regards,
Mang Zhang





At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote:
>Hi Mang,
>
>Thanks for starting this FLIP.
>
>I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>Flink design places execution in the TableFactory or directly in the
>Catalog, so introducing an executable table makes me feel a bit
>strange. (Spark is this style, but Flink may not be)
>
>And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>
>Best,
>Jingsong
>
>On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote:
>>
>> Hi Ron,
>>
>>
>> First of all, thank you for your reply!
>> After our offline communication, what you said is mainly in the compilePlan 
>> scenario, but currently compilePlanSql does not support non INSERT 
>> statements, otherwise it will throw an exception.
>> >Unsupported SQL query! compilePlanSql() only accepts a single SQL statement 
>> >of type INSERT
>> But it's a good point that I will seriously consider.
>> Non-atomic CTAS can be supported relatively easily;
>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is 
>> and follow up with a separate issue to implement CTAS support for 
>> compilePlanSql.
>>
>>
>>
>>
>>
>>
>> --
>>
>> Best regards,
>> Mang Zhang
>>
>>
>>
>>
>>
>> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote:
>> >Hi, Mang
>> >
>> >I have a question about the implementation details. For the atomicity case,
>> >since the target table is not created before the JobGraph is generated, but
>> >then the target table is required to exist when optimizing plan to generate
>> >the JobGraph. So how do you solve this problem?
>> >
>> >Best,
>> >Ron
>> >
>> >yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
>> >
>> >> Share some insights about the new TwoPhaseCatalogTable proposed after
>> >> offline discussion with Mang.
>> >> The main or important reason is that the TwoPhaseCatalogTable enables
>> >> external connectors to implement theirs own logic for commit / abort.
>> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>> >> when the job fail. It's not ideal for it's too generic to work well.
>> >> For example, some connectors will need to clean some temporary files in
>> >> abort method. And the actual connector can know the specific logic for
>> >> aborting.
>> >>
>> >> Best regards,
>> >> Yuxia
>> >>
>> >>
>> >> 发件人: "zhangmang1" <zhangma...@163.com>
>> >> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com>
>> >> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" <
>> >> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn
>> >> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> SELECT(CTAS) statement
>> >>
>> >> hi, Jing
>> >> Thank you for your reply.
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> Yes, when I was implementing the FLIP-218 solution, I encountered problems
>> >> with Catalog/CatalogTable serialization deserialization, for example, 
>> >> after
>> >> deserialization CatalogTable could not be converted to Hive Table. Also,
>> >> Catalog serialization is still a heavy operation, but it may not actually
>> >> be necessary, we just need Create Table.
>> >> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> >> facilitates the implementation of the subsequent data lake, ReplaceTable
>> >> and other functions.
>> >>
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether 
>> >> >to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> Here's what I think about this issue, atomic CTAS wants to be the default
>> >> behavior and only fall back to non-atomic CTAS if it's completely
>> >> unattainable. Atomic CTAS will bring a better experience to users.
>> >> Flink is already a stream batch unified engine, In our company kwai, many
>> >> users are also using flink to do batch data processing, but still running
>> >> in Stream mode.
>> >> The boundary between stream and batch is gradually blurred, stream mode
>> >> jobs may also FINISH, so I added the isStreamingMode parameter, this
>> >> provides different atomicity implementations in Batch and Stream modes.
>> >> Not only to determine if atomicity is supported, but also to help select
>> >> different TwoPhaseCatalogTable implementations to provide different levels
>> >> of atomicity!
>> >>
>> >> Looking forward to more feedback.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote:
>> >> >Hi Mang,
>> >> >
>> >> >This is the FLIP I was looking forward to after FLIP-218. Thanks for
>> >> >driving it. I have two questions and would like to know your thoughts,
>> >> >thanks:
>> >> >
>> >> >1. It looks like you found another way to design the atomic CTAS with new
>> >> >serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> >> as
>> >> >described in FLIP-218. Did I understand correctly?
>> >> >2. I am a little bit confused about the isStreamingMode parameter of
>> >> >Catalog#twoPhaseCreateTable(...), since it is the selector argument(code
>> >> >smell) we should commonly avoid in the public interface. According to the
>> >> >FLIP,  isStreamingMode will be used by the Catalog to determine whether 
>> >> >to
>> >> >support atomic or not. With this selector argument, there will be two
>> >> >different logics built within one method and it is hard to follow without
>> >> >reading the code or the doc carefully(another concern is to keep the doc
>> >> >and code alway be consistent) i.e. sometimes there will be no difference
>> >> by
>> >> >using true/false isStreamingMode, sometimes they are quite different -
>> >> >atomic vs. non-atomic. Another question is, before we call
>> >> >Catalog#twoPhaseCreateTable(...), we have to know the value of
>> >> >isStreamingMode. In case only non-atomic is supported for streaming mode,
>> >> >we could just follow FLIP-218 instead of (twistedly) calling
>> >> >Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I miss
>> >> >anything here?
>> >> >
>> >> >Best regards,
>> >> >Jing
>> >> >
>> >> >On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn>
>> >> wrote:
>> >> >
>> >> >> Hi, Mang.
>> >> >> +1 for completing the support for atomicity of CTAS, this is very 
>> >> >> useful
>> >> >> in batch scenarios and integrate with the data lake which support
>> >> >> transcation.
>> >> >>
>> >> >> I just have one question, IIUC, the DynamiacTableSink will need to know
>> >> >> it's for normal case or the atomicity with CTAS as well as neccessary
>> >> >> context.
>> >> >> Take jdbc catalog as an example, if it's CTAS with atomicity supports,
>> >> the
>> >> >> jdbc DynamiacTableSink will write the temp table defined in the
>> >> >> TwoPhaseCatalogTable which is different from normal case.
>> >> >>
>> >> >> How can the DynamiacTableSink can get it? Could you give some
>> >> explanation
>> >> >> or example in this FLIP?
>> >> >>
>> >> >>
>> >> >> Best regards,
>> >> >> Yuxia
>> >> >>
>> >> >> ----- 原始邮件 -----
>> >> >> 发件人: "zhangmang1" <zhangma...@163.com>
>> >> >> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>,
>> >> >> "lincoln 86xy" <lincoln.8...@gmail.com>
>> >> >> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
>> >> >> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> >> >> SELECT(CTAS) statement
>> >> >>
>> >> >> Hi, Lincoln and Ron
>> >> >>
>> >> >>
>> >> >> Thank you for your reply.
>> >> >> On the naming wise I think OK, the future expansion of new features 
>> >> >> more
>> >> >> uniform. I have updated the FLIP.
>> >> >>
>> >> >>
>> >> >> About Hive support atomicity CTAS, Hive is rich in usage scenarios and
>> >> can
>> >> >> be divided into three scenarios: 1. writing Hive tables 2. writing Hive
>> >> >> tables with speculative execution 3. writing Hive table with small file
>> >> >> merge
>> >> >>
>> >> >>
>> >> >> The main purpose of FLIP-305 is to implement support for CTAS atomicity
>> >> in
>> >> >> the Flink framework,
>> >> >> so I only poc to verify the first scenario of writing to the Hive 
>> >> >> table,
>> >> >> and we can subsequently split the sub-task to support the other two
>> >> >> scenarios.
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>
>> >> >> Best regards,
>> >> >> Mang Zhang
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote:
>> >> >> >Hi, Mang
>> >> >> >
>> >> >> >+1 for completing the support for atomicity of CTAS, this is very
>> >> useful
>> >> >> in
>> >> >> >batch scenarios.
>> >> >> >
>> >> >> >I have two questions:
>> >> >> >1. naming wise:
>> >> >> >  a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
>> >> >> >`Catalog#twoPhaseCreateTable` (and we may add
>> >> >> >twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
>> >> >> >  b) for the `TwoPhaseCommitCatalogTable`, may it be better using
>> >> >> >`TwoPhaseCatalogTable`?
>> >> >> >  c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
>> >> 'transaction'
>> >> >> >in the method name, which may remind users of the relevance of
>> >> transaction
>> >> >> >support (however, it is not strictly so), so I suggest changing it to
>> >> >> >`begin`
>> >> >> >2. Has this design been validated by any relevant Poc on hive or other
>> >> >> >catalogs?
>> >> >> >
>> >> >> >Best,
>> >> >> >Lincoln Lee
>> >> >> >
>> >> >> >
>> >> >> >liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
>> >> >> >
>> >> >> >> Hi, Mang
>> >> >> >> Atomicity is very important for CTAS, especially for batch jobs. 
>> >> >> >> This
>> >> >> FLIP
>> >> >> >> is a continuation of FLIP-218, which is valuable for CTAS.
>> >> >> >> I just have one question, in the Motivation part of FLIP-218, we
>> >> >> mentioned
>> >> >> >> three levels of atomicity semantics, can this current design do the
>> >> >> same as
>> >> >> >> Spark's DataSource V2, which can guarantee both atomicity and
>> >> isolation,
>> >> >> >> for example, can it be done by writing to Hive tables using CTAS?
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Ron
>> >> >> >>
>> >> >> >> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
>> >> >> >>
>> >> >> >> > Hi, everyone
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > I'd like to start a discussion about FLIP-305: Support atomic for
>> >> >> CREATE
>> >> >> >> > TABLE AS SELECT(CTAS) statement [1].
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > CREATE TABLE AS SELECT(CTAS) statement has been support, but it's
>> >> not
>> >> >> >> > atomic. It will create the table first before job running. If the
>> >> job
>> >> >> >> > execution fails, or is cancelled, the table will not be dropped.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > So I want Flink to support atomic CTAS, where only the table is
>> >> >> created
>> >> >> >> > when the Job succeeds. Improve user experience.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > Looking forward to your feedback.
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > [1]
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > --
>> >> >> >> >
>> >> >> >> > Best regards,
>> >> >> >> > Mang Zhang
>> >> >> >>
>> >> >>
>> >>
>> >>

Reply via email to