Hi, Mang

Thanks for your update, the FLIP looks good to me now.

Best,
Ron

Mang Zhang <zhangma...@163.com> 于2023年6月9日周五 12:08写道:

> Hi Ron,
> Thanks for your reply!
> After our offline discussion, at present, there may be many of flink jobs
> using non-atomic CTAS functions, especially Stream jobs,
> If we only infer whether atomic CTAS is supported based on whether
> DynamicTableSink implements the SupportsStaging interface,
> then after the user upgrades to a new version, flink's behavior will
> change, which is not production friendly.
> in order to ensure the consistency of flink behavior, and to give the user
> maximum flexibility,
> in time DynamicTableSink implements the SupportsStaging interface, users
> can still choose non-atomic implementation according to business needs.
>
> I have updated FLIP-305[1].
>
> Looking forward to more feedback, if there is no other feedback, I will
> launch a vote next Monday(2023-06-12).
> Thanks again!
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>
>
> --
>
> Best regards,
>
> Mang Zhang
>
>
>
> At 2023-06-09 10:23:21, "liu ron" <ron9....@gmail.com> wrote:
> >Hi, Mang
> >
> >In FLIP-214, we have discussed that atomicity is not needed in streaming
> >mode, so we have implemented the initial version that doesn't support
> >atomicity. In addition, we introduce the option
> >"table.ctas.atomicity-enabled" to enable the atomic ability. According to
> >your FLIP-315 description, Once the DynamicTableSink implements the
> >SupportsStaging interface, the atomicity is the default behavior whether in
> >stream mode or batch mode, and the user can't change it, I think this is
> >not feasible for streaming mode, the atomicity should can be controlled by
> >user. So I think we should clear the atomicity behavior combine the option
> >and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
> >the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?
> >
> >Best,
> >Ron
> >
> >Jark Wu <imj...@gmail.com> 于2023年6月8日周四 16:30写道:
> >
> >> Thank you for the great work, Mang! The updated proposal looks good to me.
> >>
> >> Best,
> >> Jark
> >>
> >> > 2023年6月8日 11:49,Jingsong Li <jingsongl...@gmail.com> 写道:
> >> >
> >> > Thanks Mang for updating!
> >> >
> >> > Looks good to me!
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang <zhangma...@163.com> wrote:
> >> >>
> >> >> Hi Jingsong,
> >> >>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >> On this issue, we introduce the executable logic commit/abort a bit of
> >> strange on CatalogTable.
> >> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
> >> scenario.
> >> >> The new solution is similar to the implementation of SupportsOverwrite,
> >> >> which introduces the SupportsStaging interface and infers whether
> >> DynamicTableSink supports atomic ctas based on whether it implements the
> >> SupportsStaging interface,
> >> >> and if so, it will get the StagedTable object from DynamicTableSink.
> >> >>
> >> >> For more implementation details, please see the FLIP-305 document.
> >> >>
> >> >> This is my poc commits
> >> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
> >> >>
> >> >>
> >> >> [1]
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >>
> >> >>
> >> >> --
> >> >>
> >> >> Best regards,
> >> >>
> >> >> Mang Zhang
> >> >>
> >> >>
> >> >>
> >> >> At 2023-05-12 13:02:14, "Jingsong Li" <jingsongl...@gmail.com> wrote:
> >> >>> Hi Mang,
> >> >>>
> >> >>> Thanks for starting this FLIP.
> >> >>>
> >> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >> >>> Flink design places execution in the TableFactory or directly in the
> >> >>> Catalog, so introducing an executable table makes me feel a bit
> >> >>> strange. (Spark is this style, but Flink may not be)
> >> >>>
> >> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >> >>>
> >> >>> Best,
> >> >>> Jingsong
> >> >>>
> >> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang <zhangma...@163.com> wrote:
> >> >>>>
> >> >>>> Hi Ron,
> >> >>>>
> >> >>>>
> >> >>>> First of all, thank you for your reply!
> >> >>>> After our offline communication, what you said is mainly in the
> >> compilePlan scenario, but currently compilePlanSql does not support non
> >> INSERT statements, otherwise it will throw an exception.
> >> >>>>> Unsupported SQL query! compilePlanSql() only accepts a single SQL
> >> statement of type INSERT
> >> >>>> But it's a good point that I will seriously consider.
> >> >>>> Non-atomic CTAS can be supported relatively easily;
> >> >>>> But atomic CTAS needs more adaptation work, so I'm going to leave it
> >> as is and follow up with a separate issue to implement CTAS support for
> >> compilePlanSql.
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> --
> >> >>>>
> >> >>>> Best regards,
> >> >>>> Mang Zhang
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> At 2023-04-23 17:52:07, "liu ron" <ron9....@gmail.com> wrote:
> >> >>>>> Hi, Mang
> >> >>>>>
> >> >>>>> I have a question about the implementation details. For the
> >> atomicity case,
> >> >>>>> since the target table is not created before the JobGraph is
> >> generated, but
> >> >>>>> then the target table is required to exist when optimizing plan to
> >> generate
> >> >>>>> the JobGraph. So how do you solve this problem?
> >> >>>>>
> >> >>>>> Best,
> >> >>>>> Ron
> >> >>>>>
> >> >>>>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年4月20日周四 09:35写道:
> >> >>>>>
> >> >>>>>> Share some insights about the new TwoPhaseCatalogTable proposed
> >> after
> >> >>>>>> offline discussion with Mang.
> >> >>>>>> The main or important reason is that the TwoPhaseCatalogTable
> >> enables
> >> >>>>>> external connectors to implement theirs own logic for commit /
> >> abort.
> >> >>>>>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the
> >> table
> >> >>>>>> when the job fail. It's not ideal for it's too generic to work well.
> >> >>>>>> For example, some connectors will need to clean some temporary
> >> files in
> >> >>>>>> abort method. And the actual connector can know the specific logic
> >> for
> >> >>>>>> aborting.
> >> >>>>>>
> >> >>>>>> Best regards,
> >> >>>>>> Yuxia
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> 发件人: "zhangmang1" <zhangma...@163.com>
> >> >>>>>> 收件人: "dev" <dev@flink.apache.org>, "Jing Ge" <j...@ververica.com>
> >> >>>>>> 抄送: "ron9 liu" <ron9....@gmail.com>, "lincoln 86xy" <
> >> >>>>>> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn
> >> >>>>>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
> >> >>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >>>>>> SELECT(CTAS) statement
> >> >>>>>>
> >> >>>>>> hi, Jing
> >> >>>>>> Thank you for your reply.
> >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> >> with new
> >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> >> serializable
> >> >>>>>> as
> >> >>>>>>> described in FLIP-218. Did I understand correctly?
> >> >>>>>> Yes, when I was implementing the FLIP-218 solution, I encountered
> >> problems
> >> >>>>>> with Catalog/CatalogTable serialization deserialization, for
> >> example, after
> >> >>>>>> deserialization CatalogTable could not be converted to Hive Table.
> >> Also,
> >> >>>>>> Catalog serialization is still a heavy operation, but it may not
> >> actually
> >> >>>>>> be necessary, we just need Create Table.
> >> >>>>>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
> >> >>>>>> facilitates the implementation of the subsequent data lake,
> >> ReplaceTable
> >> >>>>>> and other functions.
> >> >>>>>>
> >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> >> of
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> >> argument(code
> >> >>>>>>> smell) we should commonly avoid in the public interface. According
> >> to the
> >> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> >> whether to
> >> >>>>>>> support atomic or not. With this selector argument, there will be
> >> two
> >> >>>>>>> different logics built within one method and it is hard to follow
> >> without
> >> >>>>>>> reading the code or the doc carefully(another concern is to keep
> >> the doc
> >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> >> difference
> >> >>>>>> by
> >> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> >> different -
> >> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> >> streaming mode,
> >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> >> I miss
> >> >>>>>>> anything here?
> >> >>>>>> Here's what I think about this issue, atomic CTAS wants to be the
> >> default
> >> >>>>>> behavior and only fall back to non-atomic CTAS if it's completely
> >> >>>>>> unattainable. Atomic CTAS will bring a better experience to users.
> >> >>>>>> Flink is already a stream batch unified engine, In our company
> >> kwai, many
> >> >>>>>> users are also using flink to do batch data processing, but still
> >> running
> >> >>>>>> in Stream mode.
> >> >>>>>> The boundary between stream and batch is gradually blurred, stream
> >> mode
> >> >>>>>> jobs may also FINISH, so I added the isStreamingMode parameter, this
> >> >>>>>> provides different atomicity implementations in Batch and Stream
> >> modes.
> >> >>>>>> Not only to determine if atomicity is supported, but also to help
> >> select
> >> >>>>>> different TwoPhaseCatalogTable implementations to provide different
> >> levels
> >> >>>>>> of atomicity!
> >> >>>>>>
> >> >>>>>> Looking forward to more feedback.
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> --
> >> >>>>>> Best regards,
> >> >>>>>> Mang Zhang
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> At 2023-04-15 04:20:40, "Jing Ge" <j...@ververica.com.INVALID>
> >> wrote:
> >> >>>>>>> Hi Mang,
> >> >>>>>>>
> >> >>>>>>> This is the FLIP I was looking forward to after FLIP-218. Thanks
> >> for
> >> >>>>>>> driving it. I have two questions and would like to know your
> >> thoughts,
> >> >>>>>>> thanks:
> >> >>>>>>>
> >> >>>>>>> 1. It looks like you found another way to design the atomic CTAS
> >> with new
> >> >>>>>>> serializable TwoPhaseCatalogTable instead of making Catalog
> >> serializable
> >> >>>>>> as
> >> >>>>>>> described in FLIP-218. Did I understand correctly?
> >> >>>>>>> 2. I am a little bit confused about the isStreamingMode parameter
> >> of
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), since it is the selector
> >> argument(code
> >> >>>>>>> smell) we should commonly avoid in the public interface. According
> >> to the
> >> >>>>>>> FLIP,  isStreamingMode will be used by the Catalog to determine
> >> whether to
> >> >>>>>>> support atomic or not. With this selector argument, there will be
> >> two
> >> >>>>>>> different logics built within one method and it is hard to follow
> >> without
> >> >>>>>>> reading the code or the doc carefully(another concern is to keep
> >> the doc
> >> >>>>>>> and code alway be consistent) i.e. sometimes there will be no
> >> difference
> >> >>>>>> by
> >> >>>>>>> using true/false isStreamingMode, sometimes they are quite
> >> different -
> >> >>>>>>> atomic vs. non-atomic. Another question is, before we call
> >> >>>>>>> Catalog#twoPhaseCreateTable(...), we have to know the value of
> >> >>>>>>> isStreamingMode. In case only non-atomic is supported for
> >> streaming mode,
> >> >>>>>>> we could just follow FLIP-218 instead of (twistedly) calling
> >> >>>>>>> Catalog#twoPhaseCreateTable(...) with a false isStreamingMode. Did
> >> I miss
> >> >>>>>>> anything here?
> >> >>>>>>>
> >> >>>>>>> Best regards,
> >> >>>>>>> Jing
> >> >>>>>>>
> >> >>>>>>> On Fri, Apr 14, 2023 at 1:55 PM yuxia <luoyu...@alumni.sjtu.edu.cn
> >> >
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Hi, Mang.
> >> >>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> >> useful
> >> >>>>>>>> in batch scenarios and integrate with the data lake which support
> >> >>>>>>>> transcation.
> >> >>>>>>>>
> >> >>>>>>>> I just have one question, IIUC, the DynamiacTableSink will need
> >> to know
> >> >>>>>>>> it's for normal case or the atomicity with CTAS as well as
> >> neccessary
> >> >>>>>>>> context.
> >> >>>>>>>> Take jdbc catalog as an example, if it's CTAS with atomicity
> >> supports,
> >> >>>>>> the
> >> >>>>>>>> jdbc DynamiacTableSink will write the temp table defined in the
> >> >>>>>>>> TwoPhaseCatalogTable which is different from normal case.
> >> >>>>>>>>
> >> >>>>>>>> How can the DynamiacTableSink can get it? Could you give some
> >> >>>>>> explanation
> >> >>>>>>>> or example in this FLIP?
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Best regards,
> >> >>>>>>>> Yuxia
> >> >>>>>>>>
> >> >>>>>>>> ----- 原始邮件 -----
> >> >>>>>>>> 发件人: "zhangmang1" <zhangma...@163.com>
> >> >>>>>>>> 收件人: "dev" <dev@flink.apache.org>, "ron9 liu" <ron9....@gmail.com
> >> >,
> >> >>>>>>>> "lincoln 86xy" <lincoln.8...@gmail.com>
> >> >>>>>>>> 发送时间: 星期五, 2023年 4 月 14日 下午 2:50:40
> >> >>>>>>>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
> >> >>>>>>>> SELECT(CTAS) statement
> >> >>>>>>>>
> >> >>>>>>>> Hi, Lincoln and Ron
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Thank you for your reply.
> >> >>>>>>>> On the naming wise I think OK, the future expansion of new
> >> features more
> >> >>>>>>>> uniform. I have updated the FLIP.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> About Hive support atomicity CTAS, Hive is rich in usage
> >> scenarios and
> >> >>>>>> can
> >> >>>>>>>> be divided into three scenarios: 1. writing Hive tables 2.
> >> writing Hive
> >> >>>>>>>> tables with speculative execution 3. writing Hive table with
> >> small file
> >> >>>>>>>> merge
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> The main purpose of FLIP-305 is to implement support for CTAS
> >> atomicity
> >> >>>>>> in
> >> >>>>>>>> the Flink framework,
> >> >>>>>>>> so I only poc to verify the first scenario of writing to the Hive
> >> table,
> >> >>>>>>>> and we can subsequently split the sub-task to support the other
> >> two
> >> >>>>>>>> scenarios.
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>>
> >> >>>>>>>> Best regards,
> >> >>>>>>>> Mang Zhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> At 2023-04-13 12:27:24, "Lincoln Lee" <lincoln.8...@gmail.com>
> >> wrote:
> >> >>>>>>>>> Hi, Mang
> >> >>>>>>>>>
> >> >>>>>>>>> +1 for completing the support for atomicity of CTAS, this is very
> >> >>>>>> useful
> >> >>>>>>>> in
> >> >>>>>>>>> batch scenarios.
> >> >>>>>>>>>
> >> >>>>>>>>> I have two questions:
> >> >>>>>>>>> 1. naming wise:
> >> >>>>>>>>> a) can we rename the `Catalog#getTwoPhaseCommitCreateTable` to
> >> >>>>>>>>> `Catalog#twoPhaseCreateTable` (and we may add
> >> >>>>>>>>> twoPhaseReplaceTable/twoPhaseCreateOrReplaceTable later)
> >> >>>>>>>>> b) for the `TwoPhaseCommitCatalogTable`, may it be better using
> >> >>>>>>>>> `TwoPhaseCatalogTable`?
> >> >>>>>>>>> c) `TwoPhaseCommitCatalogTable#beginTransaction`, the word
> >> >>>>>> 'transaction'
> >> >>>>>>>>> in the method name, which may remind users of the relevance of
> >> >>>>>> transaction
> >> >>>>>>>>> support (however, it is not strictly so), so I suggest changing
> >> it to
> >> >>>>>>>>> `begin`
> >> >>>>>>>>> 2. Has this design been validated by any relevant Poc on hive or
> >> other
> >> >>>>>>>>> catalogs?
> >> >>>>>>>>>
> >> >>>>>>>>> Best,
> >> >>>>>>>>> Lincoln Lee
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> liu ron <ron9....@gmail.com> 于2023年4月13日周四 10:17写道:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi, Mang
> >> >>>>>>>>>> Atomicity is very important for CTAS, especially for batch
> >> jobs. This
> >> >>>>>>>> FLIP
> >> >>>>>>>>>> is a continuation of FLIP-218, which is valuable for CTAS.
> >> >>>>>>>>>> I just have one question, in the Motivation part of FLIP-218, we
> >> >>>>>>>> mentioned
> >> >>>>>>>>>> three levels of atomicity semantics, can this current design do
> >> the
> >> >>>>>>>> same as
> >> >>>>>>>>>> Spark's DataSource V2, which can guarantee both atomicity and
> >> >>>>>> isolation,
> >> >>>>>>>>>> for example, can it be done by writing to Hive tables using
> >> CTAS?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Ron
> >> >>>>>>>>>>
> >> >>>>>>>>>> Mang Zhang <zhangma...@163.com> 于2023年4月10日周一 11:03写道:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Hi, everyone
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I'd like to start a discussion about FLIP-305: Support atomic
> >> for
> >> >>>>>>>> CREATE
> >> >>>>>>>>>>> TABLE AS SELECT(CTAS) statement [1].
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> CREATE TABLE AS SELECT(CTAS) statement has been support, but
> >> it's
> >> >>>>>> not
> >> >>>>>>>>>>> atomic. It will create the table first before job running. If
> >> the
> >> >>>>>> job
> >> >>>>>>>>>>> execution fails, or is cancelled, the table will not be
> >> dropped.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> So I want Flink to support atomic CTAS, where only the table is
> >> >>>>>>>> created
> >> >>>>>>>>>>> when the Job succeeds. Improve user experience.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Looking forward to your feedback.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> [1]
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> --
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best regards,
> >> >>>>>>>>>>> Mang Zhang
> >> >>>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>
> >> >>>>>>
> >>
> >>
>
>

Reply via email to