Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Aitozi
Hi Feng,
Thanks for your good question, It's very attractive if we can support
run the original
UDTF asynchronously without introducing new UDTFs.

But I think it's not easy, because the original UDTFs are executed one
instance per parallelism
So there is no thread-safe problem to user. But for the asynchronously
version, usually means
the main process is running in a dedicated thread pool. So if we run the
originally UDTFs in
asynchronously way, it may bring thread-safe problem.


@Jing Ge  Also add the "Performance" section in FLIP.

Thanks,
Aitozi.

Feng Jin  于2023年6月9日周五 13:16写道:

> hi, Aitozi
>
> Thank you for your proposal.
>
> In our production environment, we often encounter efficiency issues with
> user-defined functions (UDFs), which can lead to slower processing speeds.
> I believe that this FLIP will make it easier for UDFs to be executed more
> efficiently.
>
>
> I have a small question:
>
> Is it possible for us to execute the original UDTF asynchronously without
> introducing new UDTFs?
> Of course, this is just my personal idea and I am not sure if it is a
> feasible solution.
>
>
> Best,
> Feng
>
>
>
> On Fri, Jun 9, 2023 at 11:18 AM Aitozi  wrote:
>
> > Hi Jing
> > Thanks for your good questions. I have updated the example to the
> FLIP.
> >
> > > Only one row for each lookup
> > lookup can also return multi rows, based on the query result. [1]
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> >
> > > If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not.
> >
> > IMO, the work pattern is similar to the lookup function, for each row
> from
> > the left table,
> > it will evaluate the eval method once, so the async call numbers will not
> > change.
> > and the maximum calls in flight is limited by the Async operators buffer
> > capacity
> > which will be controlled by the option.
> >
> > BTW, for the naming of these option, I updated the FLIP about this you
> can
> > refer to
> > the section of "ConfigOption" and "Rejected Alternatives"
> >
> > In the end, for the performance evaluation, I'd like to do some tests and
> > will update it to the FLIP doc
> >
> > Thanks,
> > Aitozi.
> >
> >
> > Jing Ge  于2023年6月9日周五 07:23写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the clarification. The code example looks interesting. I
> would
> > > suggest adding them into the FLIP. The description with code examples
> > will
> > > help readers understand the motivation and how to use it. Afaiac, it
> is a
> > > valid feature for Flink users.
> > >
> > > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> > AS
> > > OF which is also used in your code example. Temporal join performs the
> > > lookup based on the processing time match. Only one row for each
> > > lookup(afaiu, I need to check the source code to double confirm) will
> > > return for further enrichment. One the other hand, lateral join will
> have
> > > sub-queries correlated with every individual value of the reference
> table
> > > from the preceding part of the query and each sub query will return
> > > multiple rows. If we use async calls with lateral join, my gut feeling
> is
> > > that we might have many more async calls than lookup join. I am not
> > really
> > > sure if we will be facing potential issues in this case or not.
> Possible
> > > issues I can think of now e.g. too many PRC calls, too many async calls
> > > processing, the sub query will return a table which might be (too) big,
> > and
> > > might cause performance issues. I would suggest preparing some use
> cases
> > > and running some performance tests to check it. These are my concerns
> > about
> > > using async calls with lateral join and I'd like to share with you,
> happy
> > > to discuss with you and hear different opinions, hopefully the
> > > discussion could help me understand it more deeply. Please correct me
> if
> > I
> > > am wrong.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
> > >
> > > > Hi Mason,
> > > > Thanks for your input. I think if we support the user defined
> async
> > > > table function,
> > > > user will be able to use it to hold a batch data then handle it at
> one
> > > time
> > > > in the customized function.
> > > >
> > > > AsyncSink is meant for the sink operator. I have not figure out how
> to
> > > > integrate in this case.
> > > >
> > > > Thanks,
> > > > Atiozi.
> > > >
> > > >
> > > > Mason Chen  于2023年6月8日周四 12:40写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > I think it makes sense to make it easier for SQL users to make
> RPCs.
> > Do
> > > > you
> > > > > think your proposal can extend to the ability to batch data for the
> > > RPC?
> 

Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-08 Thread Kurt Ostfeld
ok:
- I start a Flink 1.17.1 cluster, run the job, then run `flink stop` and 
generate a savepoint. This savepoint will have Kryo 2.x data from standard 
Flink 1.17.1.
- I start a Flink 1.18-SNAPSHOT cluster with the pull-request, run the job with 
resume from the savepoint from Flink 1.17, then I kill the cluster. I have a 
checkpoint with metadata. I believe this checkpoint is all using Kryo 5.x 
serialization.
- I restart the cluster, run the job resuming from the checkpoint, and 
everything runs successfully. The job picks up where it left off and there are 
no errors, all output data looks correct.

Am I following the scenario correctly? Why would a checkpoint created by the 
new pull-request code have Kryo 2.x serialized data?

Here is the code for my test app that I'm using. The checkpoint configuration 
settings are mostly from 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/checkpointing/

https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-kryo-upgraded/src/main/java/demo/app/Main.java


--- Original Message ---
On Thursday, June 8th, 2023 at 9:33 AM, Chesnay Schepler  
wrote:


> 
> 
> On 08/06/2023 16:06, Kurt Ostfeld wrote:
> 
> > If I understand correctly, the scenario is resuming from multiple 
> > checkpoint files or from a savepoint and checkpoint files which may be 
> > generated by different versions of Flink
> 
> 
> No; it's the same version of Flink, you just didn't do a full migration
> of the savepoint from the start.
> 
> So, load old savepoint -> create an incremental checkpoint (which writes
> 
> bit new state with Kryo5) -> jobs fails -> try recover job (which now
> 
> has to read state was written with either Kryo2 or Kryo5).
> 
> On 08/06/2023 16:06, Kurt Ostfeld wrote:
> 
> > This pull-request build supports Java records
> 
> 
> We'd have to see but of the top of my head I doubt we want to use Kryo
> for that, and rather extend our PojoSerializer. At least so far that was
> the plan.


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Feng Jin
hi, Aitozi

Thank you for your proposal.

In our production environment, we often encounter efficiency issues with
user-defined functions (UDFs), which can lead to slower processing speeds.
I believe that this FLIP will make it easier for UDFs to be executed more
efficiently.


I have a small question:

Is it possible for us to execute the original UDTF asynchronously without
introducing new UDTFs?
Of course, this is just my personal idea and I am not sure if it is a
feasible solution.


Best,
Feng



On Fri, Jun 9, 2023 at 11:18 AM Aitozi  wrote:

> Hi Jing
> Thanks for your good questions. I have updated the example to the FLIP.
>
> > Only one row for each lookup
> lookup can also return multi rows, based on the query result. [1]
>
> [1]:
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
>
> > If we use async calls with lateral join, my gut feeling is
> that we might have many more async calls than lookup join. I am not really
> sure if we will be facing potential issues in this case or not.
>
> IMO, the work pattern is similar to the lookup function, for each row from
> the left table,
> it will evaluate the eval method once, so the async call numbers will not
> change.
> and the maximum calls in flight is limited by the Async operators buffer
> capacity
> which will be controlled by the option.
>
> BTW, for the naming of these option, I updated the FLIP about this you can
> refer to
> the section of "ConfigOption" and "Rejected Alternatives"
>
> In the end, for the performance evaluation, I'd like to do some tests and
> will update it to the FLIP doc
>
> Thanks,
> Aitozi.
>
>
> Jing Ge  于2023年6月9日周五 07:23写道:
>
> > Hi Aitozi,
> >
> > Thanks for the clarification. The code example looks interesting. I would
> > suggest adding them into the FLIP. The description with code examples
> will
> > help readers understand the motivation and how to use it. Afaiac, it is a
> > valid feature for Flink users.
> >
> > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> AS
> > OF which is also used in your code example. Temporal join performs the
> > lookup based on the processing time match. Only one row for each
> > lookup(afaiu, I need to check the source code to double confirm) will
> > return for further enrichment. One the other hand, lateral join will have
> > sub-queries correlated with every individual value of the reference table
> > from the preceding part of the query and each sub query will return
> > multiple rows. If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not. Possible
> > issues I can think of now e.g. too many PRC calls, too many async calls
> > processing, the sub query will return a table which might be (too) big,
> and
> > might cause performance issues. I would suggest preparing some use cases
> > and running some performance tests to check it. These are my concerns
> about
> > using async calls with lateral join and I'd like to share with you, happy
> > to discuss with you and hear different opinions, hopefully the
> > discussion could help me understand it more deeply. Please correct me if
> I
> > am wrong.
> >
> > Best regards,
> > Jing
> >
> >
> > On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
> >
> > > Hi Mason,
> > > Thanks for your input. I think if we support the user defined async
> > > table function,
> > > user will be able to use it to hold a batch data then handle it at one
> > time
> > > in the customized function.
> > >
> > > AsyncSink is meant for the sink operator. I have not figure out how to
> > > integrate in this case.
> > >
> > > Thanks,
> > > Atiozi.
> > >
> > >
> > > Mason Chen  于2023年6月8日周四 12:40写道:
> > >
> > > > Hi Aitozi,
> > > >
> > > > I think it makes sense to make it easier for SQL users to make RPCs.
> Do
> > > you
> > > > think your proposal can extend to the ability to batch data for the
> > RPC?
> > > > This is also another common strategy to increase throughput. Also,
> have
> > > you
> > > > considered solving this a bit differently by leveraging Flink's
> > > AsyncSink?
> > > >
> > > > Best,
> > > > Mason
> > > >
> > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
> > > >
> > > > > One more thing for discussion:
> > > > >
> > > > > In our internal implementation, we reuse the option
> > > > > `table.exec.async-lookup.buffer-capacity` and
> > > > > `table.exec.async-lookup.timeout` to config
> > > > > the async udtf. Do you think we should add two extra option to
> > > > distinguish
> > > > > from the lookup option such as
> > > > >
> > > > > `table.exec.async-udtf.buffer-capacity`
> > > > > `table.exec.async-udtf.timeout`
> > > > >
> > > > >
> > > > > Best,
> > > > > Aitozi.
> > > > >
> > > > >
> > > > >
> > > > > Aitozi  于2023年6月5日周一 

Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-08 Thread Benchao Li
As you can see that you must use `UNIX_TIMESTAMP` to do this work, that's
where the time zone happens.

What I'm talking about is casting timestamp/timestamp_ltz to long directly,
that's why the semantic is tricky when you are casting timestamp to long
using time zone.

For other systems, such as SQL server[1], they actually uses a string
instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
00:00:00.000'`, I'm not sure whether they convert the string implicitly
to TIMESTAMP_LTZ, or they just have a different definition of the syntax.

But for us, we are definitely using timestamp/timestmap_ltz literal here,
that's why it is special, and we must highlight this behavior that we are
converting a timestamp without time zone literal to long using the session
time zone.

[1]
https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16

Feng Jin  于2023年6月8日周四 11:35写道:

> Hi all,
>
> thanks for your input
>
>
> @Benchao
>
> >  The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC timezone,
> which is not usually expected by users.
>
> It was indeed the case before Flink 1.13, but now my understanding is that
> there have been some slight changes in the definition of TIMESTAMP.
>
> TIMESTAMP is currently used to specify the year, month, day, hour, minute
> and second. We recommend that users use *UNIX_TIMESTAMP(CAST(timestamp_col
> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will involve
> using the *LOCAL TIME ZONE*.
>
>
> Here is an test:
>
> Flink SQL> SET 'table.local-time-zone' = 'UTC';
> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> STRING)) as `timestamp`;
> ---
>  timestamp
>  --
>  0
>
> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00' as
> STRING)) as `timestamp`;
> ---
>  timestamp
>  --
>  -28800
>
> Therefore, the current conversion method exposed to users is also using
> LOCAL TIME ZONE.
>
>
> @yuxia
>
> Thank you very much for providing the list of behaviors of TIMESTAMP in
> other systems.
>
> > I think we can align them to avoid the inconsistency to other engines and
> provide convenience for the external connectors while integrating Flink's
> time travel API.
>
> +1 for this.
>
> > Regarding the inconsistency, I think we can consider time-travel as a
> specical case, and we do needs to highlight this in this FLIP.
> As for "violate the restriction outlined in FLINK-21978[1]", since we cast
> timestamp to epochMillis only for the internal use, and won't expose it to
> users, I don't think it will violate the restriction.
> Btw, please add a brief desc to explain the meaning of the parameter
> `timestamp` in method `CatalogBaseTable getTable(ObjectPath tablePath, long
> timestamp)`. Maybe something like "timestamp of the table snapt, which is
> millseconds since 1970-01-01 00:00:00 UTC".
>
> Thank you for the suggestions regarding the document. I will add them to
> FLIP.
>
>
> Best,
> Feng
>
>
> On Wed, Jun 7, 2023 at 12:18 PM Benchao Li  wrote:
>
> > I also share the concern about the timezone problem.
> >
> > The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> WITHOUT
> > TIME ZONE", converting it to unix timestamp would use UTC timezone, which
> > is not usually expected by users.
> >
> > If we want to keep consistent with the standard, we probably should use
> > "TIMESTAMP WITH LOCAL ZONE '2023-04-27 00:00:00'", which type is
> "TIMESTAMP
> > WITH LOCAL TIME ZONE", and converting it to unix timestamp will consider
> > the session timezone, which is the expected result. But it's inconvenient
> > for users.
> >
> > Taking this a special case, and converting "TIMESTAMP '2023-04-27
> > 00:00:00'" to a unix timestamp with session timezone, will be convenient
> > for users, but will break the standard. I will +0.5 for this choice.
> >
> > yuxia  于2023年6月7日周三 12:06写道:
> >
> > > Hi, Feng Jin.
> > > I think the concern of Leonard may be the inconsistency of the behavior
> > of
> > > TIMESTAMP '2023-04-27 00:00:00' beween timetravel and other sql
> > statement.
> > >
> > > For the normal sql:
> > > `SELECT TIMESTAMP '2023-04-27 00:00:00'`, we won't consider timezone.
> > > But for the sql for timetravl:
> > > `SELECT * FROM paimon_tb FOR SYSTEM_TIME AS OF TIMESTAMP '2023-04-27
> > > 00:00:00'`, we will consider the timezone and convert to UTC timestamp.
> > >
> > > The concern is valid. But for time travel, most style of engines,
> > > Spark[1], Hive[2], Trino[3] also do the time conversion with
> considering
> > > the seesion time zone. I think we can align them to avoid the
> > inconsistency
> > > to other engines and 

Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-08 Thread Mang Zhang
Hi Ron,
Thanks for your reply!
After our offline discussion, at present, there may be many of flink jobs using 
non-atomic CTAS functions, especially Stream jobs, 
If we only infer whether atomic CTAS is supported based on whether 
DynamicTableSink implements the SupportsStaging interface,
then after the user upgrades to a new version, flink's behavior will change, 
which is not production friendly.
in order to ensure the consistency of flink behavior, and to give the user 
maximum flexibility, 
in time DynamicTableSink implements the SupportsStaging interface, users can 
still choose non-atomic implementation according to business needs.


I have updated FLIP-305[1].


Looking forward to more feedback, if there is no other feedback, I will launch 
a vote next Monday(2023-06-12).
Thanks again!






[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement




--

Best regards,
Mang Zhang





At 2023-06-09 10:23:21, "liu ron"  wrote:
>Hi, Mang
>
>In FLIP-214, we have discussed that atomicity is not needed in streaming
>mode, so we have implemented the initial version that doesn't support
>atomicity. In addition, we introduce the option
>"table.ctas.atomicity-enabled" to enable the atomic ability. According to
>your FLIP-315 description, Once the DynamicTableSink implements the
>SupportsStaging interface, the atomicity is the default behavior whether in
>stream mode or batch mode, and the user can't change it, I think this is
>not feasible for streaming mode, the atomicity should can be controlled by
>user. So I think we should clear the atomicity behavior combine the option
>and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
>the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?
>
>Best,
>Ron
>
>Jark Wu  于2023年6月8日周四 16:30写道:
>
>> Thank you for the great work, Mang! The updated proposal looks good to me.
>>
>> Best,
>> Jark
>>
>> > 2023年6月8日 11:49,Jingsong Li  写道:
>> >
>> > Thanks Mang for updating!
>> >
>> > Looks good to me!
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang  wrote:
>> >>
>> >> Hi Jingsong,
>> >>
>> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>> >>> Flink design places execution in the TableFactory or directly in the
>> >>> Catalog, so introducing an executable table makes me feel a bit
>> >>> strange. (Spark is this style, but Flink may not be)
>> >> On this issue, we introduce the executable logic commit/abort a bit of
>> strange on CatalogTable.
>> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
>> scenario.
>> >> The new solution is similar to the implementation of SupportsOverwrite,
>> >> which introduces the SupportsStaging interface and infers whether
>> DynamicTableSink supports atomic ctas based on whether it implements the
>> SupportsStaging interface,
>> >> and if so, it will get the StagedTable object from DynamicTableSink.
>> >>
>> >> For more implementation details, please see the FLIP-305 document.
>> >>
>> >> This is my poc commits
>> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>> >>
>> >>
>> >> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> >>
>> >>
>> >> --
>> >>
>> >> Best regards,
>> >>
>> >> Mang Zhang
>> >>
>> >>
>> >>
>> >> At 2023-05-12 13:02:14, "Jingsong Li"  wrote:
>> >>> Hi Mang,
>> >>>
>> >>> Thanks for starting this FLIP.
>> >>>
>> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>> >>> Flink design places execution in the TableFactory or directly in the
>> >>> Catalog, so introducing an executable table makes me feel a bit
>> >>> strange. (Spark is this style, but Flink may not be)
>> >>>
>> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>> >>>
>> >>> Best,
>> >>> Jingsong
>> >>>
>> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang  wrote:
>> 
>>  Hi Ron,
>> 
>> 
>>  First of all, thank you for your reply!
>>  After our offline communication, what you said is mainly in the
>> compilePlan scenario, but currently compilePlanSql does not support non
>> INSERT statements, otherwise it will throw an exception.
>> > Unsupported SQL query! compilePlanSql() only accepts a single SQL
>> statement of type INSERT
>>  But it's a good point that I will seriously consider.
>>  Non-atomic CTAS can be supported relatively easily;
>>  But atomic CTAS needs more adaptation work, so I'm going to leave it
>> as is and follow up with a separate issue to implement CTAS support for
>> compilePlanSql.
>> 
>> 
>> 
>> 
>> 
>> 
>>  --
>> 
>>  Best regards,
>>  Mang Zhang
>> 
>> 
>> 
>> 
>> 
>>  At 2023-04-23 17:52:07, "liu ron"  wrote:
>> > Hi, Mang
>> >
>> > I have a question about the implementation details. 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Aitozi
Hi Jing
Thanks for your good questions. I have updated the example to the FLIP.

> Only one row for each lookup
lookup can also return multi rows, based on the query result. [1]

[1]:
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56

> If we use async calls with lateral join, my gut feeling is
that we might have many more async calls than lookup join. I am not really
sure if we will be facing potential issues in this case or not.

IMO, the work pattern is similar to the lookup function, for each row from
the left table,
it will evaluate the eval method once, so the async call numbers will not
change.
and the maximum calls in flight is limited by the Async operators buffer
capacity
which will be controlled by the option.

BTW, for the naming of these option, I updated the FLIP about this you can
refer to
the section of "ConfigOption" and "Rejected Alternatives"

In the end, for the performance evaluation, I'd like to do some tests and
will update it to the FLIP doc

Thanks,
Aitozi.


Jing Ge  于2023年6月9日周五 07:23写道:

> Hi Aitozi,
>
> Thanks for the clarification. The code example looks interesting. I would
> suggest adding them into the FLIP. The description with code examples will
> help readers understand the motivation and how to use it. Afaiac, it is a
> valid feature for Flink users.
>
> As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME AS
> OF which is also used in your code example. Temporal join performs the
> lookup based on the processing time match. Only one row for each
> lookup(afaiu, I need to check the source code to double confirm) will
> return for further enrichment. One the other hand, lateral join will have
> sub-queries correlated with every individual value of the reference table
> from the preceding part of the query and each sub query will return
> multiple rows. If we use async calls with lateral join, my gut feeling is
> that we might have many more async calls than lookup join. I am not really
> sure if we will be facing potential issues in this case or not. Possible
> issues I can think of now e.g. too many PRC calls, too many async calls
> processing, the sub query will return a table which might be (too) big, and
> might cause performance issues. I would suggest preparing some use cases
> and running some performance tests to check it. These are my concerns about
> using async calls with lateral join and I'd like to share with you, happy
> to discuss with you and hear different opinions, hopefully the
> discussion could help me understand it more deeply. Please correct me if I
> am wrong.
>
> Best regards,
> Jing
>
>
> On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
>
> > Hi Mason,
> > Thanks for your input. I think if we support the user defined async
> > table function,
> > user will be able to use it to hold a batch data then handle it at one
> time
> > in the customized function.
> >
> > AsyncSink is meant for the sink operator. I have not figure out how to
> > integrate in this case.
> >
> > Thanks,
> > Atiozi.
> >
> >
> > Mason Chen  于2023年6月8日周四 12:40写道:
> >
> > > Hi Aitozi,
> > >
> > > I think it makes sense to make it easier for SQL users to make RPCs. Do
> > you
> > > think your proposal can extend to the ability to batch data for the
> RPC?
> > > This is also another common strategy to increase throughput. Also, have
> > you
> > > considered solving this a bit differently by leveraging Flink's
> > AsyncSink?
> > >
> > > Best,
> > > Mason
> > >
> > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
> > >
> > > > One more thing for discussion:
> > > >
> > > > In our internal implementation, we reuse the option
> > > > `table.exec.async-lookup.buffer-capacity` and
> > > > `table.exec.async-lookup.timeout` to config
> > > > the async udtf. Do you think we should add two extra option to
> > > distinguish
> > > > from the lookup option such as
> > > >
> > > > `table.exec.async-udtf.buffer-capacity`
> > > > `table.exec.async-udtf.timeout`
> > > >
> > > >
> > > > Best,
> > > > Aitozi.
> > > >
> > > >
> > > >
> > > > Aitozi  于2023年6月5日周一 12:20写道:
> > > >
> > > > > Hi Jing,
> > > > >
> > > > > > what is the difference between the RPC call or query you
> > > mentioned
> > > > > and the lookup in a very
> > > > > general way
> > > > >
> > > > > I think the RPC call or query service is quite similar to the
> lookup
> > > > join.
> > > > > But lookup join should work
> > > > > with `LookupTableSource`.
> > > > >
> > > > > Let's see how we can perform an async RPC call with lookup join:
> > > > >
> > > > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > > > (2) Implement a `LookupTableSource` connector run with the async
> udtf
> > > > > defined in (1).
> > > > > (3) Then define a DDL of this look up table in SQL
> > > > >
> > > > > CREATE TEMPORARY TABLE Customers (
> > > > >   id INT,
> > > > >   name STRING,

[jira] [Created] (FLINK-32294) The CI fails due to HiveITCase

2023-06-08 Thread Rui Fan (Jira)
Rui Fan created FLINK-32294:
---

 Summary: The CI fails due to HiveITCase
 Key: FLINK-32294
 URL: https://issues.apache.org/jira/browse/FLINK-32294
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.18.0
Reporter: Rui Fan


2 ITCases fail:
 * HiveITCase.testHiveDialect
 * HiveITCase.testReadWriteHive

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=ef799394-2d67-5ff4-b2e5-410b80c9c0af=9e5768bc-daae-5f5f-1861-e58617922c7a=14346]

 

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49766=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=ae4f8708-9994-57d3-c2d7-b892156e7812=0f3adb59-eefa-51c6-2858-3654d9e0749d=14652]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-08 Thread liu ron
Hi, Mang

In FLIP-214, we have discussed that atomicity is not needed in streaming
mode, so we have implemented the initial version that doesn't support
atomicity. In addition, we introduce the option
"table.ctas.atomicity-enabled" to enable the atomic ability. According to
your FLIP-315 description, Once the DynamicTableSink implements the
SupportsStaging interface, the atomicity is the default behavior whether in
stream mode or batch mode, and the user can't change it, I think this is
not feasible for streaming mode, the atomicity should can be controlled by
user. So I think we should clear the atomicity behavior combine the option
and SuppportsStage interface in FLIP. Only the DynamicTableSink implements
the SupportsStaging and option is enabled, only atomicity is enabled. WDYT?

Best,
Ron

Jark Wu  于2023年6月8日周四 16:30写道:

> Thank you for the great work, Mang! The updated proposal looks good to me.
>
> Best,
> Jark
>
> > 2023年6月8日 11:49,Jingsong Li  写道:
> >
> > Thanks Mang for updating!
> >
> > Looks good to me!
> >
> > Best,
> > Jingsong
> >
> > On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang  wrote:
> >>
> >> Hi Jingsong,
> >>
> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >>> Flink design places execution in the TableFactory or directly in the
> >>> Catalog, so introducing an executable table makes me feel a bit
> >>> strange. (Spark is this style, but Flink may not be)
> >> On this issue, we introduce the executable logic commit/abort a bit of
> strange on CatalogTable.
> >> After an offline discussion with yuxia, I tweaked the FLIP-305[1]
> scenario.
> >> The new solution is similar to the implementation of SupportsOverwrite,
> >> which introduces the SupportsStaging interface and infers whether
> DynamicTableSink supports atomic ctas based on whether it implements the
> SupportsStaging interface,
> >> and if so, it will get the StagedTable object from DynamicTableSink.
> >>
> >> For more implementation details, please see the FLIP-305 document.
> >>
> >> This is my poc commits
> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
> >>
> >>
> >> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> >>
> >>
> >> --
> >>
> >> Best regards,
> >>
> >> Mang Zhang
> >>
> >>
> >>
> >> At 2023-05-12 13:02:14, "Jingsong Li"  wrote:
> >>> Hi Mang,
> >>>
> >>> Thanks for starting this FLIP.
> >>>
> >>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
> >>> Flink design places execution in the TableFactory or directly in the
> >>> Catalog, so introducing an executable table makes me feel a bit
> >>> strange. (Spark is this style, but Flink may not be)
> >>>
> >>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang  wrote:
> 
>  Hi Ron,
> 
> 
>  First of all, thank you for your reply!
>  After our offline communication, what you said is mainly in the
> compilePlan scenario, but currently compilePlanSql does not support non
> INSERT statements, otherwise it will throw an exception.
> > Unsupported SQL query! compilePlanSql() only accepts a single SQL
> statement of type INSERT
>  But it's a good point that I will seriously consider.
>  Non-atomic CTAS can be supported relatively easily;
>  But atomic CTAS needs more adaptation work, so I'm going to leave it
> as is and follow up with a separate issue to implement CTAS support for
> compilePlanSql.
> 
> 
> 
> 
> 
> 
>  --
> 
>  Best regards,
>  Mang Zhang
> 
> 
> 
> 
> 
>  At 2023-04-23 17:52:07, "liu ron"  wrote:
> > Hi, Mang
> >
> > I have a question about the implementation details. For the
> atomicity case,
> > since the target table is not created before the JobGraph is
> generated, but
> > then the target table is required to exist when optimizing plan to
> generate
> > the JobGraph. So how do you solve this problem?
> >
> > Best,
> > Ron
> >
> > yuxia  于2023年4月20日周四 09:35写道:
> >
> >> Share some insights about the new TwoPhaseCatalogTable proposed
> after
> >> offline discussion with Mang.
> >> The main or important reason is that the TwoPhaseCatalogTable
> enables
> >> external connectors to implement theirs own logic for commit /
> abort.
> >> In FLIP-218, for atomic CTAS, the Catalog will then just drop the
> table
> >> when the job fail. It's not ideal for it's too generic to work well.
> >> For example, some connectors will need to clean some temporary
> files in
> >> abort method. And the actual connector can know the specific logic
> for
> >> aborting.
> >>
> >> Best regards,
> >> Yuxia
> >>
> >>
> >> 发件人: "zhangmang1" 
> >> 收件人: "dev" , "Jing Ge" 
> >> 抄送: "ron9 liu" , "lincoln 86xy" <
> 

[jira] [Created] (FLINK-32293) Support vector with long index

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32293:
-

 Summary: Support vector with long index
 Key: FLINK-32293
 URL: https://issues.apache.org/jira/browse/FLINK-32293
 Project: Flink
  Issue Type: New Feature
Reporter: Zhipeng Zhang


Currently in Flink ML, we only support sparse and dense vector with `int` as 
index and `double` as value.

 

However, there are real-world cases that the index of a vector could exceed the 
range of `INT.MAX`. Thus we need to support vector with `long` index.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32292) TableUtils.getRowTypeInfo fails to get type information of Tuple

2023-06-08 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-32292:
-

 Summary: TableUtils.getRowTypeInfo fails to get type information 
of Tuple
 Key: FLINK-32292
 URL: https://issues.apache.org/jira/browse/FLINK-32292
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-06-08 Thread Shammon FY
Thanks Jing, it makes sense to me and I have updated the FLIP


Best,
Shammon FY


On Thu, Jun 8, 2023 at 11:15 PM Jing Ge  wrote:

> Hi Shammon,
>
> If we take a look at the JDK Event design as a reference, we can even add
> an Object into the event [1]. Back to the CatalogModificationEvent,
> everything related to the event could be defined in the Event. If we want
> to group some information into the Context, we could also consider adding
> the CatalogModificationContext into the Event and make the onEvent() method
> cleaner with only one input parameter CatalogModificationEvent, because the
> interface CatalogModificationListener is the most often used interface for
> users. Just my two cents.
>
> Best regards,
> Jing
>
> [1]
> http://www.java2s.com/example/java-src/pkg/java/util/eventobject-85298.html
>
> On Thu, Jun 8, 2023 at 7:50 AM Shammon FY  wrote:
>
> > Hi,
> >
> > To @Jing Ge
> > > Thanks for the clarification. Just out of curiosity, if the context is
> > not part of the event, why should it be the input parameter of each
> onEvent
> > call?
> >
> > I think it's quite strange to put some information in an Event, such as a
> > factory identifier for catalog, but they will be used by the listener.  I
> > place it in the context class and I think it is more suitable than
> directly
> > placing it in the event class.
> >
> > To @Mason
> > > 1. I'm also curious about default implementations. Would atlas/datahub
> be
> > supported by default?
> >
> > We won't do that and external systems such as atlas/datahub need to
> > implement the listener themselves.
> >
> > > 2. The FLIP title is confusing to me, especially in distinguishing it
> > from FLIP-314. Would a better FLIP title be "Support Catalog Metadata
> > Listener" or something alike?
> >
> > Thanks, I think  "Support Catalog Modification Listener" will be
> > more suitable, I'll update the title to it.
> >
> >
> > Best,
> > Shammon FY
> >
> >
> > On Thu, Jun 8, 2023 at 12:25 PM Mason Chen 
> wrote:
> >
> > > Hi Shammon,
> > >
> > > FLIP generally looks good and I'm excited to see this feature.
> > >
> > > 1. I'm also curious about default implementations. Would atlas/datahub
> be
> > > supported by default?
> > > 2. The FLIP title is confusing to me, especially in distinguishing it
> > from
> > > FLIP-314. Would a better FLIP title be "Support Catalog Metadata
> > Listener"
> > > or something alike?
> > >
> > > Best,
> > > Mason
> > >
> > > On Tue, Jun 6, 2023 at 3:33 AM Jing Ge 
> > wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for the clarification. Just out of curiosity, if the context
> is
> > > not
> > > > part of the event, why should it be the input parameter of each
> onEvent
> > > > call?
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Tue, Jun 6, 2023 at 11:58 AM Leonard Xu 
> wrote:
> > > >
> > > > > Thanks Shammon for the timely update, the updated FLIP looks good
> to
> > > me.
> > > > >
> > > > > Hope to see the vote thread and following FLIP-314 discussion
> thread.
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > > > On Jun 6, 2023, at 5:04 PM, Shammon FY 
> wrote:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Thanks for all the feedback.
> > > > > >
> > > > > > For @Jing Ge,
> > > > > > I forget to update the demo code in the FLIP, the method is
> > > > > > `onEvent(CatalogModificationEvent, CatalogModificationContext)`
> and
> > > > there
> > > > > > is no `onEvent(CatalogModificationEvent)`. I have updated the
> code.
> > > > > Context
> > > > > > contains some additional information that is not part of an
> Event,
> > > but
> > > > > > needs to be used in the listener, so we separate it from the
> event.
> > > > > >
> > > > > > For @Panagiotis,
> > > > > > I think `ioExecutor` make sense to me and I have added it in
> > > > > > `ContextModificationContext`, thanks
> > > > > >
> > > > > > For @Leonard,
> > > > > > Thanks for your input.
> > > > > > 1. I have updated `CatalogModificationContext` as an interface,
> as
> > > well
> > > > > as
> > > > > > Context in CatalogModificationListenerFactory
> > > > > > 2. Configuration sounds good to me, I have updated the method
> name
> > > and
> > > > > > getConfiguration in Context
> > > > > >
> > > > > > For @David,
> > > > > > Yes, you're right. The listener will only be used on the client
> > side
> > > > and
> > > > > > won't introduce a new code path for running per-job/per-session
> > jobs.
> > > > The
> > > > > > listener will be created in `TableEnvironment` and `SqlGateway`
> > which
> > > > > can a
> > > > > > `CatalogManager` with the listener.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Tue, Jun 6, 2023 at 3:33 PM David Morávek <
> > > david.mora...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> Thanks for the FLIP! Data lineage is an important problem to
> > tackle.
> > > > > >>
> > > > > >> Can you please expand on how this is 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Jing Ge
Hi Aitozi,

Thanks for the clarification. The code example looks interesting. I would
suggest adding them into the FLIP. The description with code examples will
help readers understand the motivation and how to use it. Afaiac, it is a
valid feature for Flink users.

As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME AS
OF which is also used in your code example. Temporal join performs the
lookup based on the processing time match. Only one row for each
lookup(afaiu, I need to check the source code to double confirm) will
return for further enrichment. One the other hand, lateral join will have
sub-queries correlated with every individual value of the reference table
from the preceding part of the query and each sub query will return
multiple rows. If we use async calls with lateral join, my gut feeling is
that we might have many more async calls than lookup join. I am not really
sure if we will be facing potential issues in this case or not. Possible
issues I can think of now e.g. too many PRC calls, too many async calls
processing, the sub query will return a table which might be (too) big, and
might cause performance issues. I would suggest preparing some use cases
and running some performance tests to check it. These are my concerns about
using async calls with lateral join and I'd like to share with you, happy
to discuss with you and hear different opinions, hopefully the
discussion could help me understand it more deeply. Please correct me if I
am wrong.

Best regards,
Jing


On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:

> Hi Mason,
> Thanks for your input. I think if we support the user defined async
> table function,
> user will be able to use it to hold a batch data then handle it at one time
> in the customized function.
>
> AsyncSink is meant for the sink operator. I have not figure out how to
> integrate in this case.
>
> Thanks,
> Atiozi.
>
>
> Mason Chen  于2023年6月8日周四 12:40写道:
>
> > Hi Aitozi,
> >
> > I think it makes sense to make it easier for SQL users to make RPCs. Do
> you
> > think your proposal can extend to the ability to batch data for the RPC?
> > This is also another common strategy to increase throughput. Also, have
> you
> > considered solving this a bit differently by leveraging Flink's
> AsyncSink?
> >
> > Best,
> > Mason
> >
> > On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
> >
> > > One more thing for discussion:
> > >
> > > In our internal implementation, we reuse the option
> > > `table.exec.async-lookup.buffer-capacity` and
> > > `table.exec.async-lookup.timeout` to config
> > > the async udtf. Do you think we should add two extra option to
> > distinguish
> > > from the lookup option such as
> > >
> > > `table.exec.async-udtf.buffer-capacity`
> > > `table.exec.async-udtf.timeout`
> > >
> > >
> > > Best,
> > > Aitozi.
> > >
> > >
> > >
> > > Aitozi  于2023年6月5日周一 12:20写道:
> > >
> > > > Hi Jing,
> > > >
> > > > > what is the difference between the RPC call or query you
> > mentioned
> > > > and the lookup in a very
> > > > general way
> > > >
> > > > I think the RPC call or query service is quite similar to the lookup
> > > join.
> > > > But lookup join should work
> > > > with `LookupTableSource`.
> > > >
> > > > Let's see how we can perform an async RPC call with lookup join:
> > > >
> > > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > > (2) Implement a `LookupTableSource` connector run with the async udtf
> > > > defined in (1).
> > > > (3) Then define a DDL of this look up table in SQL
> > > >
> > > > CREATE TEMPORARY TABLE Customers (
> > > >   id INT,
> > > >   name STRING,
> > > >   country STRING,
> > > >   zip STRING
> > > > ) WITH (
> > > >   'connector' = 'custom'
> > > > );
> > > >
> > > > (4) Run with the query as below:
> > > >
> > > > SELECT o.order_id, o.total, c.country, c.zip
> > > > FROM Orders AS o
> > > >   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > > ON o.customer_id = c.id;
> > > >
> > > > This example is from doc
> > > > <
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> > > >.You
> > > > can image the look up process as an async RPC call process.
> > > >
> > > > Let's see how we can perform an async RPC call with lateral join:
> > > >
> > > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > > (2) Run query with
> > > >
> > > > Create function f1 as '...' ;
> > > >
> > > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral
> table
> > > > (f1(order_id)) as T(...);
> > > >
> > > > As you can see, the lateral join version is more simple and intuitive
> > to
> > > > users. Users do not have to wrap a
> > > > LookupTableSource for the purpose of using async udtf.
> > > >
> > > > In the end, We can also see the user defined async table function is
> an
> > > > enhancement of the current lateral table join
> > > > which only supports sync lateral join now.
> > > >
> > > > Best,
> > > > 

Call for Presentations: Flink Forward Seattle 2023

2023-06-08 Thread Jing Ge
Dear Flink developers & users,

We hope this email finds you well. We are excited to announce the Call for
Presentations for the upcoming Flink Forward Seattle 2023, the premier
event dedicated to Apache Flink and stream processing technologies. As a
prominent figure in the field, we invite you to submit your innovative
research, insightful experiences, and cutting-edge use cases for
consideration as a speaker at the conference.

Flink Forward Conference 2023 Details:
Date: November 6-7(training), November 8 (conference)
Location: Seattle United States

Flink Forward is a conference dedicated to the Apache Flink® community. In
2023 we shall have a full conference day following a 2-days long training
session. The conference gathers an international audience of CTOs/CIOs,
developers, data architects, data scientists, Apache Flink® core
committers, and the stream processing community, to share experiences,
exchange ideas and knowledge, and receive hands-on training sessions led by
Flink experts. We are seeking compelling presentations and
thought-provoking talks that cover a broad range of topics related to
Apache Flink, including but not limited to:

Flink architecture and internals
Flink performance optimization
Advanced Flink features and enhancements
Real-world use cases and success stories
Flink ecosystem and integrations
Stream processing at scale
Best practices for Flink application development

If you have an inspiring story, valuable insights, real-world application,
research breakthroughs, use case, best practice, or compelling vision of
the future for Flink, we encourage you to present it to a highly skilled
and enthusiastic community. We welcome submissions from both industry
professionals and academic researchers.

To submit your proposal, please visit the Flink Forward Conference website
at https://www.flink-forward.org/seattle-2023/call-for-presentations. The
submission form will require you to provide an abstract of your talk, along
with a brief biography and any supporting materials. The deadline for
submissions is July 12th 11:59 pm PDT.

We believe your contribution will greatly enrich the Flink Forward
Conference and provide invaluable insights to our attendees. This is an
excellent opportunity to connect with a diverse community of Flink
enthusiasts, network with industry experts, and gain recognition for your
expertise. We look forward to receiving your submission and welcoming you
as a speaker at the Flink Forward Conference.

Thank you for your time and consideration.

Best regards,

-- 

Jing Ge | Head of Engineering

j...@ververica.com



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference - Tickets on SALE now!

[jira] [Created] (FLINK-32291) Hive E2E test fails consistently

2023-06-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32291:


 Summary: Hive E2E test fails consistently
 Key: FLINK-32291
 URL: https://issues.apache.org/jira/browse/FLINK-32291
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Hive, Tests
Affects Versions: 1.18.0
Reporter: Chesnay Schepler
 Fix For: 1.18.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49754=results



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Status of Statefun Project

2023-06-08 Thread Galen Warren
Thanks Martijn.

Personally, I'm already using a local fork of Statefun that is compatible
with Flink 1.16.x, so I wouldn't have any need for a released version
compatible with 1.15.x. I'd be happy to do the PRs to modify Statefun to
work with new versions of Flink as they come along.

As for testing, Statefun does have unit tests and Gordon also sent me
instructions a while back for how to do some additional smoke tests which
are pretty straightforward. Perhaps he could weigh in on whether the
combination of automated tests plus those smoke tests should be sufficient
for testing with new Flink versions (I believe the answer is yes).

-- Galen



On Thu, Jun 8, 2023 at 8:01 AM Martijn Visser 
wrote:

> Hi all,
>
> Apologies for the late reply.
>
> I'm willing to help out with merging requests in Statefun to keep them
> compatible with new Flink releases and create new releases. I do think that
> validation of the functionality of these releases depends a lot on those
> who do these compatibility updates, with PMC members helping out with the
> formal process.
>
> > Why can't the Apache Software Foundation allow community members to bring
> it up to date?
>
> There's nothing preventing anyone from reviewing any of the current PRs or
> opening new ones. However, none of them are approved [1], so there's also
> nothing to merge.
>
> > I believe that there are people and companies on this mailing list
> interested in supporting Apache Flink Stateful Functions.
>
> If so, then now is the time to show.
>
> Would there be a preference to create a release with Galen's merged
> compatibility update to Flink 1.15.2, or do we want to skip that and go
> straight to a newer version?
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved
>
> On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos  >
> wrote:
>
> > Why can't the Apache Software Foundation allow community members to bring
> > it up to date?
> >
> > What's the process for that?
> >
> > I believe that there are people and companies on this mailing list
> > interested in supporting Apache Flink Stateful Functions.
> >
> > You already had two people on this thread express interest.
> >
> > At the very least, we could keep the library versions up to date.
> >
> > There are only a small list of new features that might be worthwhile:
> >
> > 1. event time processing
> > 2. state rest api
> >
> >
> > On Jun 6, 2023, at 3:06 AM, Chesnay Schepler  wrote:
> >
> > If you were to fork it *and want to redistribute it* then the short
> > version is that
> >
> >1. you have to adhere to the Apache licensing requirements
> >2. you have to make it clear that your fork does not belong to the
> >Apache Flink project. (Trademarks and all that)
> >
> > Neither should be significant hurdles (there should also be plenty of
> > online resources regarding 1), and if you do this then you can freely
> share
> > your fork with others.
> >
> > I've also pinged Martijn to take a look at this thread.
> > To my knowledge the project hasn't decided anything yet.
> >
> > On 27/05/2023 04:05, Galen Warren wrote:
> >
> > Ok, I get it. No interest.
> >
> > If this project is being abandoned, I guess I'll work with my own fork.
> Is
> > there anything I should consider here? Can I share it with other people
> who
> > use this project?
> >
> > On Tue, May 16, 2023 at 10:50 AM Galen Warren 
> 
> > wrote:
> >
> >
> > Hi Martijn, since you opened this discussion thread, I'm curious what
> your
> > thoughts are in light of the responses? Thanks.
> >
> > On Wed, Apr 19, 2023 at 1:21 PM Galen Warren  <
> ga...@cvillewarrens.com>
> > wrote:
> >
> >
> > I use Apache Flink for stream processing, and StateFun as a hand-off
> >
> > point for the rest of the application.
> > It serves well as a bridge between a Flink Streaming job and
> > micro-services.
> >
> > This is essentially how I use it as well, and I would also be sad to see
> > it sunsetted. It works well; I don't know that there is a lot of new
> > development required, but if there are no new Statefun releases, then
> > Statefun can only be used with older Flink versions.
> >
> > On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos <
> mvillalo...@kineteque.com> wrote:
> >
> >
> > I am currently using Stateful Functions in my application.
> >
> > I use Apache Flink for stream processing, and StateFun as a hand-off
> > point for the rest of the application.
> > It serves well as a bridge between a Flink Streaming job and
> > micro-services.
> >
> > I would be disappointed if StateFun was sunsetted.  Its a good idea.
> >
> > If there is anything I can do to help, as a contributor perhaps, please
> > let me know.
> >
> >
> > On Apr 3, 2023, at 2:02 AM, Martijn Visser  <
> martijnvis...@apache.org>
> >
> > wrote:
> >
> > Hi everyone,
> >
> > I want to open a discussion on the status of the Statefun Project [1]
> >
> > in Apache Flink. As you might have noticed, there hasn't 

Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-08 Thread Jing Ge
Hi Ron,

Thanks for sharing the insight. Agree that it is not doable to rewrite the
entire planner module with Java. That was the reason why it has been hidden
instead of replaced. I thought, since the community decided to walk away
from scala, we should at least not add any more new scala code. According
to your suggestion, it is not the fact. I think the community should
reconsider how to handle scala, since the more features we are developing
in those areas the more scala code we will have, which makes it even harder
(impossible) to migrate to java. This is beyond the scope of this
discussion. I will start a new thread to address it.

Best regards,
Jing


On Thu, Jun 8, 2023 at 5:20 AM liu ron  wrote:

> Hi, Ging
>
> Thanks for your valuable input about scala free.
>
> Firstly, reply to your question, using java to implement codegen is
> possible,  but we need to utilize some tools. I think the first alternative
> is to update our jdk version to 13, which provides text block feature[1]
> makes string format easier, and improves the multiple-line String
> readability and writability. However, we don't update the JDK version to 13
> in the short term future. The second alternative is to use a third library
> such as Freemarker and StringTemplate, but this is not easy work, we need
> to introduce extra dependency in table planner, and makes our
> implementation more complicated.
>
> We use a lot of scala code in the planner module, one of the main purposes
> is that codegen is more friendly, and many of the operators are also
> implemented through codegen. In the foreseeable future, we do not have the
> time and manpower to remove the scala code from the planner module, so
> scala-free is unlikely. From the point of view of development friendliness
> and development cost, scala is currently a relatively better solution for
> codegen. Suppose we need to completely rewrite the planner module in java
> in the future, I think it is better to consider what tools are used to
> support codegen in a unified way at that time, and I can't give a suitable
> tool at the moment.
>
> In summary, I don't think it is feasible to implement my FLIP with
> scala-free at this time.
>
> [1]: https://openjdk.org/jeps/378
>
> Best,
> Ron
>
>
> liu ron  于2023年6月8日周四 10:51写道:
>
> > Hi, Atiozi
> >
> > Thanks for your feedback.
> >
> > > Traverse the ExecNode DAG and create a FusionExecNode  for physical
> > operators that can be fused together.
> > which kind of operators can be fused together ? are the operators in an
> > operator chain? Is this optimization aligned to spark's whole stage
> codegen
> > ?
> > In theory, all kinds of operators can be fused together, our final goal
> is
> > to support all operators in batch mode, OperatorChain is just one case.
> Due
> > to this work effort is relatively large, so we need to complete it step
> by
> > step. Our OFCG not only achieves the ability of spark's whole stage
> > codegen, but also do more better than them.
> >
> > > does the "support codegen" means fusion codegen? but why we generate a
> > FusionTransformation when the member operator does not support codegen,
> IMO
> > it should
> > fallback to the current behavior.
> >
> > yes, it means the fusion codegen. In FLIP, I propose two operator fusion
> > mechanisms, one is like OperatorChain for single input operator, another
> is
> > MultipleInput fusion. For the former, our design mechanism is to fuse all
> > operators together at the ExecNode layer only if they all support fusion
> > codegen, or else go over the default OperatorChain. For the latter, in
> > order not to break the existing MultipleInput optimization purpose, so
> when
> > there are member operators that do not support fusion codegen,  we will
> > fall back to the current behavior[1], which means that a
> > FusionTransformation is created. here FusionTransformation is just a
> > surrogate for MultipleInput case, it actually means
> > MultipleInputTransformation, which fuses multiple physical operators.
> > Sorry, the description in the flow is not very clear and caused your
> > confusion.
> >
> > > In the end, I share the same idea with Lincoln about performance
> > benchmark.
> > Currently flink community's flink-benchmark only covers like schedule,
> > state, datastream operator's performance.
> > A good benchmark harness for sql operator will benefit the sql optimizer
> > topic and observation
> >
> > For the performance benchmark, I agree with you. As I stated earlier, I
> > think this is a new scope of work, we should design it separately, we can
> > introduce this improvement in the future.
> >
> > [1]
> >
> https://github.com/apache/flink/blob/77214f138cf759a3ee5466c9b2379e717227a0ae/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java#L123
> >
> > Best,
> > Ron
> >
> > Jing Ge  于2023年6月8日周四 04:28写道:
> >
> >> Hi Ron,
> >>
> >> Thanks for raising the proposal. It is a very 

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-08 Thread Paul Lam
Hi ShengKai,

Good point with the ANALYZE TABLE and CALL PROCEDURE statements.

>  Can we remove the jars if the job is running or gateway exits?

Yes, I think it would be okay to remove the resources after the job is 
submitted.
It should be Gateway’s responsibility to remove them.

> Can we use the returned rest client by ApplicationDeployer to query the job
> id? I am concerned that users don't know which job is related to the
> submitted SQL.

That should be doable, as normally we only allow one job in an application 
cluster ATM.

But a more significant problem I see is that select statements are not 
available.

Perhaps we need to make CollectSinkFunction accept an external sink address 
from SQL Gateway to get the result back from SQL Driver. WDYT?

> It seems we need to introduce a new module. Will the new module is
> available in the distribution package? I agree with Jark that we don't need
> to introduce this for table-API users and these users have their main
> class. If we want to make users write the k8s operator more easily, I think
> we should modify the k8s operator repo. If we don't need to support SQL
> files, can we make this jar only visible in the sql-gateway like we do in
> the planner loader?[1]

I rethink the relationship between SQL Driver and SQL Client with embedded
Gateway. With the help of SQL Driver, we should be able to run SQL files
with non-interactive SQL Client on K8s, just as @Biao did.

If it’s the case, I’m good with introducing a new module and making SQL Driver
an internal class and accepts JSON plans only. 

WRT visibility, I lean toward making it more publicly visible and easy to 
integrate 
with external systems. I think putting the jar in the opt folder is good. May 
you 
elaborate a bit more about the benefit we get from an extra loader?

Best,
Paul Lam

> 2023年6月7日 17:25,Shengkai Fang  写道:
> 
> Hi. Paul.  Thanks for your update and the update makes me understand the
> design much better.
> 
> But I still have some questions about the FLIP.
> 
>> For SQL Gateway, only DMLs need to be delegated to the SQL server
>> Driver. I would think about the details and update the FLIP. Do you have
> some
>> ideas already?
> 
> If the applicaiton mode can not support library mode, I think we should
> only execute INSERT INTO and UPDATE/ DELETE statement in the application
> mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
> statements. The ANALYZE TABLE syntax need to register the statistic to the
> catalog after job finishes and the CALL PROCEDURE statement doesn't
> generate the ExecNodeGraph.
> 
> * Introduce storage via option `sql-gateway.application.storage-dir`
> 
> If we can not support to submit the jars through web submission, +1 to
> introduce the options to upload the files. While I think the uploader
> should be responsible to remove the uploaded jars. Can we remove the jars
> if the job is running or gateway exits?
> 
> * JobID is not avaliable
> 
> Can we use the returned rest client by ApplicationDeployer to query the job
> id? I am concerned that users don't know which job is related to the
> submitted SQL.
> 
> * Do we need to introduce a new module named flink-table-sql-runner?
> 
> It seems we need to introduce a new module. Will the new module is
> available in the distribution package? I agree with Jark that we don't need
> to introduce this for table-API users and these users have their main
> class. If we want to make users write the k8s operator more easily, I think
> we should modify the k8s operator repo. If we don't need to support SQL
> files, can we make this jar only visible in the sql-gateway like we do in
> the planner loader?[1]
> 
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95
> 
> Best,
> Shengkai
> 
> 
> 
> 
> 
> 
> 
> 
> Weihua Hu  于2023年6月7日周三 10:52写道:
> 
>> Hi,
>> 
>> Thanks for updating the FLIP.
>> 
>> I have two cents on the distribution of SQLs and resources.
>> 1. Should we support a common file distribution mechanism for k8s
>> application mode?
>>  I have seen some issues and requirements on the mailing list.
>>  In our production environment, we implement the download command in the
>> CliFrontend.
>>  And automatically add an init container to the POD for file downloading.
>> The advantage of this
>>  is that we can use all Flink-supported file systems to store files.
>> 
>>  This need more discussion. I would appreciate hearing more opinions.
>> 
>> 2. In this FLIP, we distribute files in two different ways in YARN and
>> Kubernetes. Can we combine it in one way?
>>  If we don't want to implement a common file distribution for k8s
>> application mode. Could we use the SQLDriver
>>  to download the files both in YARN and K8S? IMO, this can reduce the cost
>> of code maintenance.
>> 
>> Best,
>> Weihua
>> 
>> 
>> On Wed, Jun 7, 2023 at 10:18 AM Paul Lam  wrote:
>> 
>>> Hi Mason,
>>> 
>>> 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-06-08 Thread Jing Ge
Hi Shammon,

If we take a look at the JDK Event design as a reference, we can even add
an Object into the event [1]. Back to the CatalogModificationEvent,
everything related to the event could be defined in the Event. If we want
to group some information into the Context, we could also consider adding
the CatalogModificationContext into the Event and make the onEvent() method
cleaner with only one input parameter CatalogModificationEvent, because the
interface CatalogModificationListener is the most often used interface for
users. Just my two cents.

Best regards,
Jing

[1]
http://www.java2s.com/example/java-src/pkg/java/util/eventobject-85298.html

On Thu, Jun 8, 2023 at 7:50 AM Shammon FY  wrote:

> Hi,
>
> To @Jing Ge
> > Thanks for the clarification. Just out of curiosity, if the context is
> not part of the event, why should it be the input parameter of each onEvent
> call?
>
> I think it's quite strange to put some information in an Event, such as a
> factory identifier for catalog, but they will be used by the listener.  I
> place it in the context class and I think it is more suitable than directly
> placing it in the event class.
>
> To @Mason
> > 1. I'm also curious about default implementations. Would atlas/datahub be
> supported by default?
>
> We won't do that and external systems such as atlas/datahub need to
> implement the listener themselves.
>
> > 2. The FLIP title is confusing to me, especially in distinguishing it
> from FLIP-314. Would a better FLIP title be "Support Catalog Metadata
> Listener" or something alike?
>
> Thanks, I think  "Support Catalog Modification Listener" will be
> more suitable, I'll update the title to it.
>
>
> Best,
> Shammon FY
>
>
> On Thu, Jun 8, 2023 at 12:25 PM Mason Chen  wrote:
>
> > Hi Shammon,
> >
> > FLIP generally looks good and I'm excited to see this feature.
> >
> > 1. I'm also curious about default implementations. Would atlas/datahub be
> > supported by default?
> > 2. The FLIP title is confusing to me, especially in distinguishing it
> from
> > FLIP-314. Would a better FLIP title be "Support Catalog Metadata
> Listener"
> > or something alike?
> >
> > Best,
> > Mason
> >
> > On Tue, Jun 6, 2023 at 3:33 AM Jing Ge 
> wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for the clarification. Just out of curiosity, if the context is
> > not
> > > part of the event, why should it be the input parameter of each onEvent
> > > call?
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jun 6, 2023 at 11:58 AM Leonard Xu  wrote:
> > >
> > > > Thanks Shammon for the timely update, the updated FLIP looks good to
> > me.
> > > >
> > > > Hope to see the vote thread and following FLIP-314 discussion thread.
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > > > On Jun 6, 2023, at 5:04 PM, Shammon FY  wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for all the feedback.
> > > > >
> > > > > For @Jing Ge,
> > > > > I forget to update the demo code in the FLIP, the method is
> > > > > `onEvent(CatalogModificationEvent, CatalogModificationContext)` and
> > > there
> > > > > is no `onEvent(CatalogModificationEvent)`. I have updated the code.
> > > > Context
> > > > > contains some additional information that is not part of an Event,
> > but
> > > > > needs to be used in the listener, so we separate it from the event.
> > > > >
> > > > > For @Panagiotis,
> > > > > I think `ioExecutor` make sense to me and I have added it in
> > > > > `ContextModificationContext`, thanks
> > > > >
> > > > > For @Leonard,
> > > > > Thanks for your input.
> > > > > 1. I have updated `CatalogModificationContext` as an interface, as
> > well
> > > > as
> > > > > Context in CatalogModificationListenerFactory
> > > > > 2. Configuration sounds good to me, I have updated the method name
> > and
> > > > > getConfiguration in Context
> > > > >
> > > > > For @David,
> > > > > Yes, you're right. The listener will only be used on the client
> side
> > > and
> > > > > won't introduce a new code path for running per-job/per-session
> jobs.
> > > The
> > > > > listener will be created in `TableEnvironment` and `SqlGateway`
> which
> > > > can a
> > > > > `CatalogManager` with the listener.
> > > > >
> > > > >
> > > > > Best,
> > > > > Shammon FY
> > > > >
> > > > >
> > > > > On Tue, Jun 6, 2023 at 3:33 PM David Morávek <
> > david.mora...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> Thanks for the FLIP! Data lineage is an important problem to
> tackle.
> > > > >>
> > > > >> Can you please expand on how this is planned to be wired into the
> > > > >> JobManager? As I understand, the listeners will be configured
> > globally
> > > > (per
> > > > >> cluster), so this won't introduce a new code path for running
> > per-job
> > > /
> > > > >> per-session user code. Is that correct?
> > > > >>
> > > > >> Best,
> > > > >> D
> > > > >>
> > > > >> On Tue, Jun 6, 2023 at 9:17 AM Leonard Xu 
> > wrote:
> > > > >>
> > > > >>> Thanks Shammon for driving this FLIP 

Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-08 Thread Chesnay Schepler

On 08/06/2023 16:06, Kurt Ostfeld wrote:

  If I understand correctly, the scenario is resuming from multiple checkpoint 
files or from a savepoint and checkpoint files which may be generated by 
different versions of Flink


No; it's the same version of Flink, you just didn't do a full migration 
of the savepoint from the start.


So, load old savepoint -> create an incremental checkpoint (which writes 
bit new state with Kryo5) -> jobs fails -> try recover job (which now 
has to read state was written with either Kryo2 or Kryo5).


On 08/06/2023 16:06, Kurt Ostfeld wrote:

This pull-request build supports Java records


We'd have to see but of the top of my head I doubt we want to use Kryo 
for that, and rather extend our PojoSerializer. At least so far that was 
the plan.




Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-08 Thread Kurt Ostfeld
If the Flink project is planning to completely drop all stateful upgrade 
compatibility within the near year for a Flink 2.0 release, then providing a 
stateful migration pathway from Kryo 2.x to Kryo 5.x is probably unnecessary. 
Is that correct? Is the Flink project pretty confident that Flink 2.0 will not 
be compatible with Flink 1.x state?




--- Original Message ---
On Monday, June 5th, 2023 at 7:51 AM, Martijn Visser  
wrote:


> 
> 
> Hi ConradJam,
> 
> That assumes that it will be possible to upgrade statefully to Flink 2.0:
> given that it is a major breaking change, I wouldn't assume that will be
> possible.
> 
> Best regards,
> 
> Martijn
> 
> On Mon, Jun 5, 2023 at 2:37 PM ConradJam jam.gz...@gmail.com wrote:
> 
> > Here I have a suggestion, because I mentioned Flink2.0 earlier, I am
> > wondering if there is a possibility: whether the user can perform the
> > migration of all states to Kryo5 when performing the first start-up
> > task of migrating to version 2.0 in the future, until we give up
> > maintaining Kryo2 later
> > 
> > Don't know if my idea coincides with Chesnay's
> > 
> > Chesnay Schepler ches...@apache.org 于2023年6月1日周四 23:25写道:
> > 
> > > The version in the state is the serializer version, and applies to the
> > > entire state, independent of what it contains.
> > > If you use Kryo2 for reading and Kryo5 for writing (which also implies
> > > writing the new serializer version into state), then I'd assume that a
> > > migration is an all-or-nothing kind of deal.
> > > IOW, you'd have to load a savepoint and write out an entirely new
> > > savepoint with the new state.
> > > Otherwise you may have only re-written part of the checkpoint, and now
> > > contains a mix of Kryo2/Kryo5 serialized classes, which should then fail
> > > hard on any recovery attempt because we wouldn't use Kryo2 to read
> > > anything.
> > > 
> > > If I'm right, then as is this sounds like quite a trap for users to fall
> > > into because from what I gathered this is the default behavior in the PR
> > > (I could be wrong though since I haven't fully dug through the 8k lines
> > > PR yet...)
> > > 
> > > What we kind of want is this:
> > > 1) Kryo5 is used as the default for new jobs. (maybe not even that,
> > > making it an explicit opt-in)
> > > 2) Kryo2 is used for reading AND writing for existing* jobs by default.
> > > 3) Users can explicitly (and easily!) do a full migration of their jobs,
> > > after which 2) should no longer apply.
> > > 
> > > In the PR you mentioned running into issues on Java 17; to have have
> > > some error stacktraces and examples data/serializers still around?
> > > 
> > > On 30/05/2023 00:38, Kurt Ostfeld wrote:
> > > 
> > > > > I’d assumed that there wasn’t a good way to migrate state stored with
> > > > > an older version of Kryo to a newer version - if you’ve solved that, 
> > > > > kudos.
> > > > > I hope I've solved this. The pull request is supposed to do exactly
> > > > > this. Please let me know if you can propose a scenario that would 
> > > > > break
> > > > > this.
> > > > 
> > > > The pull-request has both Kryo 2.x and 5.x dependencies. It looks at
> > > > the state version number written to the state to determine which 
> > > > version of
> > > > Kryo to use for deserialization. Kryo 2.x is not used to write new 
> > > > state.
> > > > 
> > > > --- Original Message ---
> > > > On Monday, May 29th, 2023 at 5:17 PM, Ken Krugler <
> > > > kkrugler_li...@transpac.com> wrote:
> > > > 
> > > > > Hi Kurt,
> > > > > 
> > > > > I personally think it’s a very nice improvement, and that the
> > > > > longer-term goal of removing built-in Kryo support for state 
> > > > > serialization
> > > > > (while a good one) warrants a separate FLIP.
> > > > > 
> > > > > Perhaps an intermediate approach would be to disable the use of Kryo
> > > > > for state serialization by default, and force a user to disregard 
> > > > > warnings
> > > > > and explicitly enable it if they want to go down that path.
> > > > > 
> > > > > I’d assumed that there wasn’t a good way to migrate state stored with
> > > > > an older version of Kryo to a newer version - if you’ve solved that, 
> > > > > kudos.
> > > > > 
> > > > > — Ken
> > > > > 
> > > > > > On May 29, 2023, at 2:21 PM, Kurt Ostfeld
> > > > > > kurtostf...@proton.me.INVALID wrote:
> > > > > > 
> > > > > > Hi everyone. I would like to start the discussion thread for
> > > > > > FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0 [1].
> > > > > > 
> > > > > > There is a pull-request associated with this linked in the FLIP.
> > > > > > 
> > > > > > I'd particularly like to hear about:
> > > > > > 
> > > > > > - Chesnay Schepler's request to consider removing Kryo serializers
> > > > > > from the execution config. Is this a reasonable task to add into 
> > > > > > this FLIP?
> > > > > > Is there consensus on how to resolve that? Would that be better 
> > > > > > addressed
> > > > > > in a separate future FLIP after the Kryo upgrade FLIP 

Re: [DISCUSS] FLIP-317: Upgrade Kryo from 2.24.0 to 5.5.0

2023-06-08 Thread Kurt Ostfeld
Thank you very much for the feedback.

- With this pull-request build, Flink runs successfully with a JDK 17 runtime 
for applications without saved state or with applications with saved state from 
this pull-request build which is using Kryo 5.x. FYI, the Maven build is still 
run with JDK 8 or 11 but the Flink jobmanager and taskmanager can be run with a 
JDK 17 runtime.
- Kryo 2.x is still on the classpath for backwards compatibility purposes, and 
if you try to load a savepoint from Flink 1.17 or older which uses the Kryo 2.x 
serialization library with JDK 17+, that will fail with exceptions.
- A stateful upgrade pathway looks like this: Applications run a Flink cluster 
with this pull-request under JDK 8 or 11, load an existing savepoint with Kryo 
2.x data, write out a new savepoint which automatically uses Kryo 5.x, restart 
the Flink cluster with a JDK 17 runtime, and resume from the new savepoint 
successfully.
- This pull-request build supports Java records (which obviously requires 
JDK17+ at runtime) with the Flink DataStream API. Kryo 5.x supports records so 
this works without any extra configuration. A simple demo is here: 
https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java.
 The app is built with JDK 17, Flink's Maven build still runs with JDK 8/11, 
but the Flink cluster uses JDK 17 at runtime.


I need to investigate the scenario you describe. If I understand correctly, the 
scenario is resuming from multiple checkpoint files or from a savepoint and 
checkpoint files which may be generated by different versions of Flink and 
therefore may be using different Kryo library versions. Is that accurate? We 
need to accommodate that scenario and I will investigate.




--- Original Message ---
On Thursday, June 1st, 2023 at 10:25 AM, Chesnay Schepler  
wrote:


>
>
> The version in the state is the serializer version, and applies to the
> entire state, independent of what it contains.
> If you use Kryo2 for reading and Kryo5 for writing (which also implies
> writing the new serializer version into state), then I'd assume that a
> migration is an all-or-nothing kind of deal.
> IOW, you'd have to load a savepoint and write out an entirely new
> savepoint with the new state.
> Otherwise you may have only re-written part of the checkpoint, and now
> contains a mix of Kryo2/Kryo5 serialized classes, which should then fail
> hard on any recovery attempt because we wouldn't use Kryo2 to read
> anything.
>
> If I'm right, then as is this sounds like quite a trap for users to fall
> into because from what I gathered this is the default behavior in the PR
> (I could be wrong though since I haven't fully dug through the 8k lines
> PR yet...)
>
> What we kind of want is this:
> 1) Kryo5 is used as the default for new jobs. (maybe not even that,
> making it an explicit opt-in)
> 2) Kryo2 is used for reading AND writing for existing* jobs by default.
> 3) Users can explicitly (and easily!) do a full migration of their jobs,
> after which 2) should no longer apply.
>
>
>
> In the PR you mentioned running into issues on Java 17; to have have
> some error stacktraces and examples data/serializers still around?
>
> On 30/05/2023 00:38, Kurt Ostfeld wrote:
>
> > > I’d assumed that there wasn’t a good way to migrate state stored with an 
> > > older version of Kryo to a newer version - if you’ve solved that, kudos.
> > > I hope I've solved this. The pull request is supposed to do exactly this. 
> > > Please let me know if you can propose a scenario that would break this.
> >
> > The pull-request has both Kryo 2.x and 5.x dependencies. It looks at the 
> > state version number written to the state to determine which version of 
> > Kryo to use for deserialization. Kryo 2.x is not used to write new state.
> >
> > --- Original Message ---
> > On Monday, May 29th, 2023 at 5:17 PM, Ken Krugler 
> > kkrugler_li...@transpac.com wrote:
> >
> > > Hi Kurt,
> > >
> > > I personally think it’s a very nice improvement, and that the longer-term 
> > > goal of removing built-in Kryo support for state serialization (while a 
> > > good one) warrants a separate FLIP.
> > >
> > > Perhaps an intermediate approach would be to disable the use of Kryo for 
> > > state serialization by default, and force a user to disregard warnings 
> > > and explicitly enable it if they want to go down that path.
> > >
> > > I’d assumed that there wasn’t a good way to migrate state stored with an 
> > > older version of Kryo to a newer version - if you’ve solved that, kudos.
> > >
> > > — Ken
> > >
> > > > On May 29, 2023, at 2:21 PM, Kurt Ostfeld kurtostf...@proton.me.INVALID 
> > > > wrote:
> > > >
> > > > Hi everyone. I would like to start the discussion thread for FLIP-317: 
> > > > Upgrade Kryo from 2.24.0 to 5.5.0 [1].
> > > >
> > > > There is a pull-request associated with this linked in the FLIP.
> > > >
> > > > I'd particularly like to hear about:
> > > >
> > > 

[jira] [Created] (FLINK-32290) Enable -XX:+IgnoreUnrecognizedVMOptions

2023-06-08 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32290:


 Summary: Enable -XX:+IgnoreUnrecognizedVMOptions
 Key: FLINK-32290
 URL: https://issues.apache.org/jira/browse/FLINK-32290
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Build System, Deployment / YARN
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


We can make our lives a lot easier by enabling {{IgnoreUnrecognizedVMOptions}} 
for all processes. With this we can set add-opens/add-exports independent of 
what JDK is actually being used, removing a major source of complexity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Status of Statefun Project

2023-06-08 Thread Martijn Visser
Hi all,

Apologies for the late reply.

I'm willing to help out with merging requests in Statefun to keep them
compatible with new Flink releases and create new releases. I do think that
validation of the functionality of these releases depends a lot on those
who do these compatibility updates, with PMC members helping out with the
formal process.

> Why can't the Apache Software Foundation allow community members to bring
it up to date?

There's nothing preventing anyone from reviewing any of the current PRs or
opening new ones. However, none of them are approved [1], so there's also
nothing to merge.

> I believe that there are people and companies on this mailing list
interested in supporting Apache Flink Stateful Functions.

If so, then now is the time to show.

Would there be a preference to create a release with Galen's merged
compatibility update to Flink 1.15.2, or do we want to skip that and go
straight to a newer version?

Best regards,

Martijn

[1]
https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved

On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos 
wrote:

> Why can't the Apache Software Foundation allow community members to bring
> it up to date?
>
> What's the process for that?
>
> I believe that there are people and companies on this mailing list
> interested in supporting Apache Flink Stateful Functions.
>
> You already had two people on this thread express interest.
>
> At the very least, we could keep the library versions up to date.
>
> There are only a small list of new features that might be worthwhile:
>
> 1. event time processing
> 2. state rest api
>
>
> On Jun 6, 2023, at 3:06 AM, Chesnay Schepler  wrote:
>
> If you were to fork it *and want to redistribute it* then the short
> version is that
>
>1. you have to adhere to the Apache licensing requirements
>2. you have to make it clear that your fork does not belong to the
>Apache Flink project. (Trademarks and all that)
>
> Neither should be significant hurdles (there should also be plenty of
> online resources regarding 1), and if you do this then you can freely share
> your fork with others.
>
> I've also pinged Martijn to take a look at this thread.
> To my knowledge the project hasn't decided anything yet.
>
> On 27/05/2023 04:05, Galen Warren wrote:
>
> Ok, I get it. No interest.
>
> If this project is being abandoned, I guess I'll work with my own fork. Is
> there anything I should consider here? Can I share it with other people who
> use this project?
>
> On Tue, May 16, 2023 at 10:50 AM Galen Warren  
> 
> wrote:
>
>
> Hi Martijn, since you opened this discussion thread, I'm curious what your
> thoughts are in light of the responses? Thanks.
>
> On Wed, Apr 19, 2023 at 1:21 PM Galen Warren  
> 
> wrote:
>
>
> I use Apache Flink for stream processing, and StateFun as a hand-off
>
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.
>
> This is essentially how I use it as well, and I would also be sad to see
> it sunsetted. It works well; I don't know that there is a lot of new
> development required, but if there are no new Statefun releases, then
> Statefun can only be used with older Flink versions.
>
> On Tue, Apr 18, 2023 at 10:04 PM Marco Villalobos  
> wrote:
>
>
> I am currently using Stateful Functions in my application.
>
> I use Apache Flink for stream processing, and StateFun as a hand-off
> point for the rest of the application.
> It serves well as a bridge between a Flink Streaming job and
> micro-services.
>
> I would be disappointed if StateFun was sunsetted.  Its a good idea.
>
> If there is anything I can do to help, as a contributor perhaps, please
> let me know.
>
>
> On Apr 3, 2023, at 2:02 AM, Martijn Visser  
> 
>
> wrote:
>
> Hi everyone,
>
> I want to open a discussion on the status of the Statefun Project [1]
>
> in Apache Flink. As you might have noticed, there hasn't been much
> development over the past months in the Statefun repository [2]. There is
> currently a lack of active contributors and committers who are able to help
> with the maintenance of the project.
>
> In order to improve the situation, we need to solve the lack of
>
> committers and the lack of contributors.
>
> On the lack of committers:
>
> 1. Ideally, there are some of the current Flink committers who have
>
> the bandwidth and can help with reviewing PRs and merging them.
>
> 2. If that's not an option, it could be a consideration that current
>
> committers only approve and review PRs, that are approved by those who are
> willing to contribute to Statefun and if the CI passes
>
> On the lack of contributors:
>
> 3. Next to having this discussion on the Dev and User mailing list, we
>
> can also create a blog with a call for new contributors on the Flink
> project website, send out some tweets on the Flink / Statefun twitter
> accounts, post messages on Slack etc. In that message, we would inform how
> those that 

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-08 Thread Paul Lam
Hi Weihua,

Thanks a lot for your input!

I see the difference here is implementing the file distribution mechanism
in the generic CLI or in the SQL Driver. The CLI approach could benefit
non-pure-SQL applications (which is not covered by SQL Driver) as well.

Not sure if you’re proposing the CliFrontend + init containers approach
or suggesting a native file mechanism for the flink-kubernetes module?

Considering your 2nd question, I’m assuming the latter. I think it’s totally
worthwhile to discuss, and we may split it into another FLIP, because
it’s a significant change. 

Personally, I’m big +1 for a built-in file distribution mechanism on K8s.

Best,
Paul Lam

> 2023年6月7日 10:51,Weihua Hu  写道:
> 
> Hi,
> 
> Thanks for updating the FLIP.
> 
> I have two cents on the distribution of SQLs and resources.
> 1. Should we support a common file distribution mechanism for k8s
> application mode?
>  I have seen some issues and requirements on the mailing list.
>  In our production environment, we implement the download command in the
> CliFrontend.
>  And automatically add an init container to the POD for file downloading.
> The advantage of this
>  is that we can use all Flink-supported file systems to store files.
> 
>  This need more discussion. I would appreciate hearing more opinions.
> 
> 2. In this FLIP, we distribute files in two different ways in YARN and
> Kubernetes. Can we combine it in one way?
>  If we don't want to implement a common file distribution for k8s
> application mode. Could we use the SQLDriver
>  to download the files both in YARN and K8S? IMO, this can reduce the cost
> of code maintenance.
> 
> Best,
> Weihua
> 
> 
> On Wed, Jun 7, 2023 at 10:18 AM Paul Lam  > wrote:
> 
>> Hi Mason,
>> 
>> Thanks for your input!
>> 
>>> +1 for init containers or a more generalized way of obtaining arbitrary
>>> files. File fetching isn't specific to just SQL--it also matters for Java
>>> applications if the user doesn't want to rebuild a Flink image and just
>>> wants to modify the user application fat jar.
>> 
>> I agree that utilizing SQL Drivers in Java applications is equally
>> important
>> as employing them in SQL Gateway. WRT init containers, I think most
>> users use them just as a workaround. For example, wget a jar from the
>> maven repo.
>> 
>> We could implement the functionality in SQL Driver in a more graceful
>> way and the flink-supported filesystem approach seems to be a
>> good choice.
>> 
>>> Also, what do you think about prefixing the config options with
>>> `sql-driver` instead of just `sql` to be more specific?
>> 
>> LGTM, since SQL Driver is a public interface and the options are
>> specific to it.
>> 
>> Best,
>> Paul Lam
>> 
>>> 2023年6月6日 06:30,Mason Chen  写道:
>>> 
>>> Hi Paul,
>>> 
>>> +1 for this feature and supporting SQL file + JSON plans. We get a lot of
>>> requests to just be able to submit a SQL file, but the JSON plan
>>> optimizations make sense.
>>> 
>>> +1 for init containers or a more generalized way of obtaining arbitrary
>>> files. File fetching isn't specific to just SQL--it also matters for Java
>>> applications if the user doesn't want to rebuild a Flink image and just
>>> wants to modify the user application fat jar.
>>> 
>>> Please note that we could reuse the checkpoint storage like S3/HDFS,
>> which
 should
>>> 
>>> be required to run Flink in production, so I guess that would be
>> acceptable
 for most
>>> 
>>> users. WDYT?
>>> 
>>> 
>>> If you do go this route, it would be nice to support writing these files
>> to
>>> S3/HDFS via Flink. This makes access control and policy management
>> simpler.
>>> 
>>> Also, what do you think about prefixing the config options with
>>> `sql-driver` instead of just `sql` to be more specific?
>>> 
>>> Best,
>>> Mason
>>> 
>>> On Mon, Jun 5, 2023 at 2:28 AM Paul Lam >>  > paullin3...@gmail.com >> wrote:
>>> 
 Hi Jark,
 
 Thanks for your input! Please see my comments inline.
 
> Isn't Table API the same way as DataSream jobs to submit Flink SQL?
> DataStream API also doesn't provide a default main class for users,
> why do we need to provide such one for SQL?
 
 Sorry for the confusion I caused. By DataStream jobs, I mean jobs
>> submitted
 via Flink CLI which actually could be DataStream/Table jobs.
 
 I think a default main class would be user-friendly which eliminates the
 need
 for users to write a main class as SQLRunner in Flink K8s operator [1].
 
> I thought the proposed SqlDriver was a dedicated main class accepting
 SQL files, is
> that correct?
 
 Both JSON plans and SQL files are accepted. SQL Gateway should use JSON
 plans,
 while CLI users may use either JSON plans or SQL files.
 
 Please see the updated FLIP[2] for more details.
 
> Personally, I prefer the way of init containers which doesn't depend on

[jira] [Created] (FLINK-32289) The metadata column type is incorrect in Kafka table connector example

2023-06-08 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-32289:
--

 Summary: The metadata column type is incorrect in Kafka table 
connector example
 Key: FLINK-32289
 URL: https://issues.apache.org/jira/browse/FLINK-32289
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1, 1.16.2, 1.15.4
Reporter: Leonard Xu


The example[1] defined ts column with TIMESTAMP type

 
{code:java}
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
{code}
the correct column type should be TIMESTAMP_LTZ type.

 
{code:java}
 `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  {code}
 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/#how-to-create-a-kafka-table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32288) Improve the scheduling performance of AdaptiveBatchScheduler

2023-06-08 Thread xingbe (Jira)
xingbe created FLINK-32288:
--

 Summary: Improve the scheduling performance of 
AdaptiveBatchScheduler
 Key: FLINK-32288
 URL: https://issues.apache.org/jira/browse/FLINK-32288
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: xingbe
 Fix For: 1.18.0


After adding the benchmark of AdaptiveBatchScheduler in FLINK-30480, we noticed 
a regression in the performance of 
SchedulingDownstreamTasksInBatchJobBenchmark#SchedulingDownstreamTasks. When 
scheduling a batch job with a parallelism of 4000*4000, the time spent 
increased from 32ms to 1336ms on my local PC.

To improve the performance, we can optimize the traversal by checking if the 
consumedPartitionGroups have finished all its partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-06-08 Thread Jark Wu
Thank you for the proposal, yuxia! The FLIP looks good to me. 

Best,
Jark

> 2023年6月8日 11:39,yuxia  写道:
> 
> Hi, all.
> Thanks everyone for the valuable input. If there are are no further concerns 
> about this FLIP[1], I would like to start voting next monday (6/12).
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> 
> 
> Best regards,
> Yuxia
> 
> - 原始邮件 -
> 发件人: "Martijn Visser" 
> 收件人: "dev" 
> 发送时间: 星期二, 2023年 6 月 06日 下午 3:57:56
> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> 
> Hi Yuxia,
> 
> Thanks for the clarification. I would be +0 overall, because I think
> without actually allowing creation/customization of stored procedures, the
> value for the majority of Flink users will be minimal.
> 
> Best regards,
> 
> Martijn
> 
> On Tue, Jun 6, 2023 at 3:52 AM yuxia  wrote:
> 
>> Hi, Martijn.
>> Thanks for you feedback.
>> 1: In this FLIP we don't intend to allow users to customize their own
>> stored procedure for we don't want to expose too much to users too early as
>> the FLIP said.
>> The procedures are supposed to be provided only by Catalog. Catalog devs
>> can write their build-in procedures, and return the procedure in method
>> Catalog.getProcedure(ObjectPath procedurePath);
>> So, there won't be SQL syntax to create/save a stored procedure in this
>> FLIP. If we find we do need it, we can propse the SQL syntax to create a
>> stored procedure in another dedicated FLIP.
>> 
>> 2: The syntax `Call procedure_name(xx)` proposed in this FLIP is the
>> default syntax in Calcite for call stored procedures. Actaully, we don't
>> need to do any modifcation in flink-sql-parser module for syntax of calling
>> a procedure. MySQL[1], Postgres[2], Oracle[3] also use the syntax to call a
>> stored procedure.
>> 
>> 
>> [1] https://dev.mysql.com/doc/refman/8.0/en/call.html
>> [2] https://www.postgresql.org/docs/15/sql-call.html
>> [3] https://docs.oracle.com/javadb/10.8.3.0/ref/rrefcallprocedure.html
>> 
>> Best regards,
>> Yuxia
>> 
>> - 原始邮件 -
>> 发件人: "Martijn Visser" 
>> 收件人: "dev" 
>> 发送时间: 星期一, 2023年 6 月 05日 下午 8:35:44
>> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>> 
>> Hi Yuxia,
>> 
>> Thanks for the FLIP. I have a couple of questions:
>> 
>> 1. The syntax talks about how to CALL or SHOW the available stored
>> procedures, but not on how to create one. Will there not be a SQL syntax to
>> create/save a stored procedure?
>> 2. Is there a default syntax in Calcite for stored procedures? What do
>> other databases do, do they use CALL/SHOW or something like EXEC, USE?
>> 
>> Best regards,
>> 
>> Martijn
>> 
>> On Mon, Jun 5, 2023 at 3:23 AM yuxia  wrote:
>> 
>>> Hi, Jane.
>>> Thanks for you input. I think we can add the auxiliary command show
>>> procedures in this FLIP.
>>> Following the syntax for show functions proposed in FLIP-297.
>>> The syntax will be
>>> SHOW PROCEDURES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT]
>>> (LIKE | ILIKE)  ].
>>> I have updated to this FLIP.
>>> 
>>> The other auxiliary commands maybe not suitable currently or need a
>>> further/dedicated dicussion. Let's keep this FLIP focus.
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-297%3A+Improve+Auxiliary+Sql+Statements
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> - 原始邮件 -
>>> 发件人: "Jane Chan" 
>>> 收件人: "dev" 
>>> 发送时间: 星期六, 2023年 6 月 03日 下午 7:04:39
>>> 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
>>> 
>>> Hi Yuxia,
>>> 
>>> Thanks for bringing this to the discussion. The call procedure is a
>> widely
>>> used feature and will be very useful for users.
>>> 
>>> I just have one question regarding the usage. The FLIP mentioned that
>>> 
>>> Flink will allow connector developers to develop their own built-in
>> stored
 procedures, and then enables users to call these predefiend stored
 procedures.
 
>>> In this FLIP, we don't intend to allow users to customize their own
>> stored
 procedure  for we don't want to expose too much to users too early.
>>> 
>>> 
>>> If I understand correctly, we might need to provide some auxiliary
>> commands
>>> to inform users what built-in procedures are provided and how to use
>> them.
>>> For example, Snowflake provides commands like [1] [2], and MySQL provides
>>> commands like [3] [4].
>>> 
>>> [1] SHOW PROCEDURES,
>>> https://docs.snowflake.com/en/sql-reference/sql/show-procedures
>>> [2] DESCRIBE PROCEDURE ,
>>> https://docs.snowflake.com/en/sql-reference/sql/desc-procedure
>>> [3] SHOW PROCEDURE CODE,
>>> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-code.html
>>> [4] SHOW PROCEDURE STATUS,
>>> https://dev.mysql.com/doc/refman/5.7/en/show-procedure-status.html
>>> 
>>> Best,
>>> Jane
>>> 
>>> On Sat, Jun 3, 2023 at 3:20 PM Benchao Li  wrote:
>>> 
 Thanks Yuxia for the explanation, it makes sense to me. It would be
>> great
 if you also add this to the FLIP doc.
 
 yuxia  于2023年6月1日周四 17:11写道:

Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-08 Thread Jark Wu
Thank you for the great work, Mang! The updated proposal looks good to me. 

Best,
Jark

> 2023年6月8日 11:49,Jingsong Li  写道:
> 
> Thanks Mang for updating!
> 
> Looks good to me!
> 
> Best,
> Jingsong
> 
> On Wed, Jun 7, 2023 at 2:31 PM Mang Zhang  wrote:
>> 
>> Hi Jingsong,
>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>> On this issue, we introduce the executable logic commit/abort a bit of 
>> strange on CatalogTable.
>> After an offline discussion with yuxia, I tweaked the FLIP-305[1] scenario.
>> The new solution is similar to the implementation of SupportsOverwrite,
>> which introduces the SupportsStaging interface and infers whether 
>> DynamicTableSink supports atomic ctas based on whether it implements the 
>> SupportsStaging interface,
>> and if so, it will get the StagedTable object from DynamicTableSink.
>> 
>> For more implementation details, please see the FLIP-305 document.
>> 
>> This is my poc commits 
>> https://github.com/Tartarus0zm/flink/commit/025b30ad8f1a03e7738e9bb534e6e491c31990fa
>> 
>> 
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
>> 
>> 
>> --
>> 
>> Best regards,
>> 
>> Mang Zhang
>> 
>> 
>> 
>> At 2023-05-12 13:02:14, "Jingsong Li"  wrote:
>>> Hi Mang,
>>> 
>>> Thanks for starting this FLIP.
>>> 
>>> I have some doubts about the `TwoPhaseCatalogTable`. Generally, our
>>> Flink design places execution in the TableFactory or directly in the
>>> Catalog, so introducing an executable table makes me feel a bit
>>> strange. (Spark is this style, but Flink may not be)
>>> 
>>> And for `TwoPhase`, maybe `StagedXXX` like Spark is better?
>>> 
>>> Best,
>>> Jingsong
>>> 
>>> On Wed, May 10, 2023 at 9:29 PM Mang Zhang  wrote:
 
 Hi Ron,
 
 
 First of all, thank you for your reply!
 After our offline communication, what you said is mainly in the 
 compilePlan scenario, but currently compilePlanSql does not support non 
 INSERT statements, otherwise it will throw an exception.
> Unsupported SQL query! compilePlanSql() only accepts a single SQL 
> statement of type INSERT
 But it's a good point that I will seriously consider.
 Non-atomic CTAS can be supported relatively easily;
 But atomic CTAS needs more adaptation work, so I'm going to leave it as is 
 and follow up with a separate issue to implement CTAS support for 
 compilePlanSql.
 
 
 
 
 
 
 --
 
 Best regards,
 Mang Zhang
 
 
 
 
 
 At 2023-04-23 17:52:07, "liu ron"  wrote:
> Hi, Mang
> 
> I have a question about the implementation details. For the atomicity 
> case,
> since the target table is not created before the JobGraph is generated, 
> but
> then the target table is required to exist when optimizing plan to 
> generate
> the JobGraph. So how do you solve this problem?
> 
> Best,
> Ron
> 
> yuxia  于2023年4月20日周四 09:35写道:
> 
>> Share some insights about the new TwoPhaseCatalogTable proposed after
>> offline discussion with Mang.
>> The main or important reason is that the TwoPhaseCatalogTable enables
>> external connectors to implement theirs own logic for commit / abort.
>> In FLIP-218, for atomic CTAS, the Catalog will then just drop the table
>> when the job fail. It's not ideal for it's too generic to work well.
>> For example, some connectors will need to clean some temporary files in
>> abort method. And the actual connector can know the specific logic for
>> aborting.
>> 
>> Best regards,
>> Yuxia
>> 
>> 
>> 发件人: "zhangmang1" 
>> 收件人: "dev" , "Jing Ge" 
>> 抄送: "ron9 liu" , "lincoln 86xy" <
>> lincoln.8...@gmail.com>, luoyu...@alumni.sjtu.edu.cn
>> 发送时间: 星期三, 2023年 4 月 19日 下午 3:13:36
>> 主题: Re:Re: [DISCUSS] FLIP-305: Support atomic for CREATE TABLE AS
>> SELECT(CTAS) statement
>> 
>> hi, Jing
>> Thank you for your reply.
>>> 1. It looks like you found another way to design the atomic CTAS with 
>>> new
>>> serializable TwoPhaseCatalogTable instead of making Catalog serializable
>> as
>>> described in FLIP-218. Did I understand correctly?
>> Yes, when I was implementing the FLIP-218 solution, I encountered 
>> problems
>> with Catalog/CatalogTable serialization deserialization, for example, 
>> after
>> deserialization CatalogTable could not be converted to Hive Table. Also,
>> Catalog serialization is still a heavy operation, but it may not actually
>> be necessary, we just need Create Table.
>> Therefore, the TwoPhaseCatalogTable program is proposed, which also
>> facilitates the 

[jira] [Created] (FLINK-32287) Add doc for truncate table statement

2023-06-08 Thread luoyuxia (Jira)
luoyuxia created FLINK-32287:


 Summary: Add doc for truncate table statement
 Key: FLINK-32287
 URL: https://issues.apache.org/jira/browse/FLINK-32287
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32286) Align the shade pattern that Hive connector using for calcite related class with flink-table-planner

2023-06-08 Thread luoyuxia (Jira)
luoyuxia created FLINK-32286:


 Summary: Align the shade pattern that Hive connector using for 
calcite related class  with flink-table-planner
 Key: FLINK-32286
 URL: https://issues.apache.org/jira/browse/FLINK-32286
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[NOTICE] 1 week left for collecting release 2.0 proposals

2023-06-08 Thread Xintong Song
Hi everyone,

Just want to kindly remind that, as mentioned in [1], we are trying to
collect work items for release 2.0 by *June 15*, which is only one week
from now. If there's anything you'd like to propose for the major release,
please fill the chart on the wiki page [2]. If more time is needed,
please let us know.

Best,

Xintong


[1] https://lists.apache.org/thread/2r1g95bx7h6s96mlmcp8h5lxyj1mh3yy

[2] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release