Thank you for the great work, Mang! The updated proposal looks good to me.
Best, Jark > 2023年6月8日 11:49,Jingsong Li <jingsongl...@gmail.com> 写道: > > Thanks Mang for updating! > > Looks good to me! > > Best, > Jingsong > > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zhangma...@163.com> wrote: >> >> Hi Jingsong, >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our >>> Flink design places execution in the TableFactory or directly in the >>> Catalog, so introducing an executable table makes me feel a bit >>> strange. (Spark is this style, but Flink may not be) >> On this issue, we introduce the executable logic commit/abort a bit of >> strange on CatalogTable. >> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario. >> The new solution is similar to the implementation of SupportsOverwrite, >> which introduces the SupportsStaging interface and infers whether >> DynamicTableSink supports atomic ctas based on whether it implements the >> SupportsStaging interface, >> and if so, it will get the StagedTable object from DynamicTableSink. >> >> For more implementation details, please see the FLIP-305 document. >> >> This is my poc commits >> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa >> >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement >> >> >> -- >> >> Best regards, >> >> Mang Zhang >> >> >> >> At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote: >>> Hi Mang, >>> >>> Thanks for starting this FLIP. >>> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our >>> Flink design places execution in the TableFactory or directly in the >>> Catalog, so introducing an executable table makes me feel a bit >>> strange. (Spark is this style, but Flink may not be) >>> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better? >>> >>> Best, >>> Jingsong >>> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote: >>>> >>>> Hi Ron, >>>> >>>> >>>> First of all, thank you for your reply! >>>> After our offline communication, what you said is mainly in the >>>> compilePlan scenario, but currently compilePlanSql does not support non >>>> INSERT statements, otherwise it will throw an exception. >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL >>>>> statement of type INSERT >>>> But it's a good point that I will seriously consider. >>>> Non-atomic CTAS can be supported relatively easily; >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it as is >>>> and follow up with a separate issue to implement CTAS support for >>>> compilePlanSql. >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Best regards, >>>> Mang Zhang >>>> >>>> >>>> >>>> >>>> >>>> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote: >>>>> Hi, Mang >>>>> >>>>> I have a question about the implementation details. For the atomicity >>>>> case, >>>>> since the target table is not created before the JobGraph is generated, >>>>> but >>>>> then the target table is required to exist when optimizing plan to >>>>> generate >>>>> the JobGraph. So how do you solve this problem? >>>>> >>>>> Best, >>>>> Ron >>>>> >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道: >>>>> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed after >>>>>> offline discussion with Mang. >>>>>> The main or important reason is that the TwoPhaseCatalogTable enables >>>>>> external connectors to implement theirs own logic for commit / abort. >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table >>>>>> when the job fail. It's not ideal for it's too generic to work well. >>>>>> For example, some connectors will need to clean some temporary files in >>>>>> abort method. And the actual connector can know the specific logic for >>>>>> aborting. >>>>>> >>>>>> Best regards, >>>>>> Yuxia >>>>>> >>>>>> >>>>>> 发件人: "zhangmang1" <zhangma...@163.com> >>>>>> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com> >>>>>> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" < >>>>>> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36 >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS >>>>>> SELECT(CTAS) statement >>>>>> >>>>>> hi, Jing >>>>>> Thank you for your reply. >>>>>>> 1. It looks like you found another way to design the atomic CTAS with >>>>>>> new >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable >>>>>> as >>>>>>> described in FLIP-218. Did I understand correctly? >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered >>>>>> problems >>>>>> with Catalog/CatalogTable serialization deserialization, for example, >>>>>> after >>>>>> deserialization CatalogTable could not be converted to Hive Table. Also, >>>>>> Catalog serialization is still a heavy operation, but it may not actually >>>>>> be necessary, we just need Create Table. >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also >>>>>> facilitates the implementation of the subsequent data lake, ReplaceTable >>>>>> and other functions. >>>>>> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code >>>>>>> smell) we should commonly avoid in the public interface. According to >>>>>>> the >>>>>>> FLIP, isStreamingMode will be used by the Catalog to determine whether >>>>>>> to >>>>>>> support atomic or not. With this selector argument, there will be two >>>>>>> different logics built within one method and it is hard to follow >>>>>>> without >>>>>>> reading the code or the doc carefully(another concern is to keep the doc >>>>>>> and code alway be consistent) i.e. sometimes there will be no difference >>>>>> by >>>>>>> using true/false isStreamingMode, sometimes they are quite different - >>>>>>> atomic vs. non-atomic. Another question is, before we call >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of >>>>>>> isStreamingMode. In case only non-atomic is supported for streaming >>>>>>> mode, >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I >>>>>>> miss >>>>>>> anything here? >>>>>> Here's what I think about this issue, atomic CTAS wants to be the default >>>>>> behavior and only fall back to non-atomic CTAS if it's completely >>>>>> unattainable. Atomic CTAS will bring a better experience to users. >>>>>> Flink is already a stream batch unified engine, In our company kwai, many >>>>>> users are also using flink to do batch data processing, but still running >>>>>> in Stream mode. >>>>>> The boundary between stream and batch is gradually blurred, stream mode >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this >>>>>> provides different atomicity implementations in Batch and Stream modes. >>>>>> Not only to determine if atomicity is supported, but also to help select >>>>>> different TwoPhaseCatalogTable implementations to provide different >>>>>> levels >>>>>> of atomicity! >>>>>> >>>>>> Looking forward to more feedback. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards, >>>>>> Mang Zhang >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID> wrote: >>>>>>> Hi Mang, >>>>>>> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks for >>>>>>> driving it. I have two questions and would like to know your thoughts, >>>>>>> thanks: >>>>>>> >>>>>>> 1. It looks like you found another way to design the atomic CTAS with >>>>>>> new >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable >>>>>> as >>>>>>> described in FLIP-218. Did I understand correctly? >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter of >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector argument(code >>>>>>> smell) we should commonly avoid in the public interface. According to >>>>>>> the >>>>>>> FLIP, isStreamingMode will be used by the Catalog to determine whether >>>>>>> to >>>>>>> support atomic or not. With this selector argument, there will be two >>>>>>> different logics built within one method and it is hard to follow >>>>>>> without >>>>>>> reading the code or the doc carefully(another concern is to keep the doc >>>>>>> and code alway be consistent) i.e. sometimes there will be no difference >>>>>> by >>>>>>> using true/false isStreamingMode, sometimes they are quite different - >>>>>>> atomic vs. non-atomic. Another question is, before we call >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of >>>>>>> isStreamingMode. In case only non-atomic is supported for streaming >>>>>>> mode, >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did I >>>>>>> miss >>>>>>> anything here? >>>>>>> >>>>>>> Best regards, >>>>>>> Jing >>>>>>> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn> >>>>>> wrote: >>>>>>> >>>>>>>> Hi, Mang. >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very >>>>>>>> useful >>>>>>>> in batch scenarios and integrate with the data lake which support >>>>>>>> transcation. >>>>>>>> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need to know >>>>>>>> it's for normal case or the atomicity with CTAS as well as neccessary >>>>>>>> context. >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity supports, >>>>>> the >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the >>>>>>>> TwoPhaseCatalogTable which is different from normal case. >>>>>>>> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some >>>>>> explanation >>>>>>>> or example in this FLIP? >>>>>>>> >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Yuxia >>>>>>>> >>>>>>>> ----- 原始邮件 ----- >>>>>>>> 发件人: "zhangmang1" <zhangma...@163.com> >>>>>>>> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com>, >>>>>>>> "lincoln 86xy" <lincoln.8...@gmail.com> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40 >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS >>>>>>>> SELECT(CTAS) statement >>>>>>>> >>>>>>>> Hi, Lincoln and Ron >>>>>>>> >>>>>>>> >>>>>>>> Thank you for your reply. >>>>>>>> On the naming wise I think OK, the future expansion of new features >>>>>>>> more >>>>>>>> uniform. I have updated the FLIP. >>>>>>>> >>>>>>>> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage scenarios and >>>>>> can >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2. writing Hive >>>>>>>> tables with speculative execution 3. writing Hive table with small file >>>>>>>> merge >>>>>>>> >>>>>>>> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS atomicity >>>>>> in >>>>>>>> the Flink framework, >>>>>>>> so I only poc to verify the first scenario of writing to the Hive >>>>>>>> table, >>>>>>>> and we can subsequently split the sub-task to support the other two >>>>>>>> scenarios. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Best regards, >>>>>>>> Mang Zhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com> wrote: >>>>>>>>> Hi, Mang >>>>>>>>> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very >>>>>> useful >>>>>>>> in >>>>>>>>> batch scenarios. >>>>>>>>> >>>>>>>>> I have two questions: >>>>>>>>> 1. naming wise: >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later) >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using >>>>>>>>> `TwoPhaseCatalogTable`? >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word >>>>>> 'transaction' >>>>>>>>> in the method name, which may remind users of the relevance of >>>>>> transaction >>>>>>>>> support (however, it is not strictly so), so I suggest changing it to >>>>>>>>> `begin` >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or other >>>>>>>>> catalogs? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Lincoln Lee >>>>>>>>> >>>>>>>>> >>>>>>>>> liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道: >>>>>>>>> >>>>>>>>>> Hi, Mang >>>>>>>>>> Atomicity is very important for CTAS, especially for batch jobs. This >>>>>>>> FLIP >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS. >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we >>>>>>>> mentioned >>>>>>>>>> three levels of atomicity semantics, can this current design do the >>>>>>>> same as >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and >>>>>> isolation, >>>>>>>>>> for example, can it be done by writing to Hive tables using CTAS? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Ron >>>>>>>>>> >>>>>>>>>> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道: >>>>>>>>>> >>>>>>>>>>> Hi, everyone >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic for >>>>>>>> CREATE >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1]. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but it's >>>>>> not >>>>>>>>>>> atomic. It will create the table first before job running. If the >>>>>> job >>>>>>>>>>> execution fails, or is cancelled, the table will not be dropped. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is >>>>>>>> created >>>>>>>>>>> when the Job succeeds. Improve user experience. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Looking forward to your feedback. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Mang Zhang >>>>>>>>>> >>>>>>>> >>>>>> >>>>>>