Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-22 Thread Ron liu
Hi, Feng

Thanks for your feedback.

> Although currently we restrict users from modifying the query, I wonder if
we can provide a better way to help users rebuild it without affecting
downstream OLAP queries.

Considering the problem of data consistency, so in the first step we are
strictly limited in semantics and do not support modify the query. This is
really a good problem, one of my ideas is to introduce a syntax similar to
SWAP [1], which supports exchanging two Dynamic Tables.

> From the documentation, the definitions SQL and job information are
stored in the Catalog. Does this mean that if a system needs to adapt to
Dynamic Tables, it also needs to store Flink's job information in the
corresponding system?
For example, does MySQL's Catalog need to store flink job information as
well?

Yes, currently we need to rely on Catalog to store refresh job information.

> Users still need to consider how much memory is being used, how large
the concurrency is, which type of state backend is being used, and may need
to set TTL expiration.

Similar to the current practice, job parameters can be set via the Flink
conf or SET commands

> When we submit a refresh command, can we help users detect if there are
any
running jobs and automatically stop them before executing the refresh
command? Then wait for it to complete before restarting the background
streaming job?

Purely from a technical implementation point of view, your proposal is
doable, but it would be more costly. Also I think data consistency itself
is the responsibility of the user, similar to how Regular Table is now also
the responsibility of the user, so it's consistent with its behavior and no
additional guarantees are made at the engine level.

Best,
Ron


Ahmed Hamdy  于2024年3月22日周五 23:50写道:

> Hi Ron,
> Sorry for joining the discussion late, thanks for the effort.
>
> I think the base idea is great, however I have a couple of comments:
> - I want to iterate on Timo's comments regarding the confusion between
> "Dynamic Table" and current Flink "Table". Should the refactoring of the
> system happen in 2.0, should we rename it in this Flip ( as the suggestions
> in the thread ) and address the holistic changes in a separate Flip for
> 2.0?
> - I feel confused with how it is further with other components, the
> examples provided feel like a standalone ETL job, could you provide in the
> FLIP an example where the table is further used in subsequent queries
> (specially in batch mode).
> - I really like the standard of keeping the unified batch and streaming
> approach
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 22 Mar 2024 at 12:07, Lincoln Lee  wrote:
>
> > Hi Timo,
> >
> > Thanks for your thoughtful inputs!
> >
> > Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function,
> > but our primary concern is that by using a view, we might limit future
> > opportunities
> > to optimize queries through automatic materialization rewriting [1],
> > leveraging
> > the support for MV by physical storage. This is because we would be
> > breaking
> > the intuitive semantics of a materialized view (a materialized view
> > represents
> > the result of a query) by allowing data modifications, thus losing the
> > potential
> > for such optimizations.
> >
> > With these considerations in mind, we were inspired by Google Looker's
> > Persistent
> > Derived Table [2]. PDT is designed for building Looker's automated
> > modeling,
> > aligning with our purpose for the stream-batch automatic pipeline.
> > Therefore,
> > we are considering another candidate, Derived Table, the term 'derive'
> > suggests a
> > query, and 'table' retains modifiability. This approach would not disrupt
> > our current
> > concept of a dynamic table, preserving the future utility of MVs.
> >
> > Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
> > introducing
> >  a new concept Derived Table for this FLIP, this makes all concepts to
> play
> > together nicely.
> >
> > What do you think about this?
> >
> > [1] https://calcite.apache.org/docs/materialized_views.html
> > [2]
> >
> >
> https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables
> >
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Timo Walther  于2024年3月22日周五 17:54写道:
> >
> > > Hi Ron,
> > >
> > > thanks for the detailed answer. Sorry, for my late reply, we had a
> > > conference that kept me busy.
> > >
> > >  > In the current concept[1], it actually includes: Dynamic Tables &
> > >  > & Continuous Query. Dynamic Table is just an abstract logical
> concept
> > >
> > > This explanation makes sense to me. But the docs also say "A continuous
> > > query is evaluated on the dynamic table yielding a new dynamic table.".
> > > So even our regular CREATE TABLEs are considered dynamic tables. This
> > > can also be seen in the diagram "Dynamic Table -> Continuous Query ->
> > > Dynamic Table". Currently, Flink queries can only be executed on
> Dynamic
> > > Tables.
> > >
> > >  > In 

[jira] [Created] (FLINK-34923) Behavioral discrepancy between `TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`

2024-03-22 Thread Chloe He (Jira)
Chloe He created FLINK-34923:


 Summary: Behavioral discrepancy between 
`TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`
 Key: FLINK-34923
 URL: https://issues.apache.org/jira/browse/FLINK-34923
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Chloe He


I found that there is some behavioral discrepancy between 
`TableEnvironment.execute_sql()` and `TableEnvironment.sql_query()`.

A minimal reproducible example:
{code:java}
SELECT `value` FROM (VALUES (CAST(ARRAY[ROW(1, 2), ROW(2, 2)] AS ARRAY>))) AS `t`(`value`) {code}
This throws
{code:java}
File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/table.py:943,
 in Table.to_pandas(self)
939 import pytz
940 timezone = pytz.timezone(
941 
self._j_table.getTableEnvironment().getConfig().getLocalTimeZone().getId())
942 serializer = ArrowSerializer(
--> 943 create_arrow_schema(self.get_schema().get_field_names(),
944 self.get_schema().get_field_data_types()),
945 self.get_schema().to_row_data_type(),
946 timezone)
947 import pyarrow as pa
948 table = 
pa.Table.from_batches(serializer.load_from_iterator(batches_iterator))

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2194,
 in create_arrow_schema(field_names, field_types)
   2190 """
   2191 Create an Arrow schema with the specified filed names and types.
   2192 """
   2193 import pyarrow as pa
-> 2194 fields = [pa.field(field_name, to_arrow_type(field_type), 
field_type._nullable)
   2195   for field_name, field_type in zip(field_names, field_types)]
   2196 return pa.schema(fields)

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2194,
 in (.0)
   2190 """
   2191 Create an Arrow schema with the specified filed names and types.
   2192 """
   2193 import pyarrow as pa
-> 2194 fields = [pa.field(field_name, to_arrow_type(field_type), 
field_type._nullable)
   2195   for field_name, field_type in zip(field_names, field_types)]
   2196 return pa.schema(fields)

File 
~/anaconda3/envs/ibis-dev-flink/lib/python3.10/site-packages/pyflink/table/types.py:2316,
 in to_arrow_type(data_type)
   2314 elif isinstance(data_type, ArrayType):
   2315 if type(data_type.element_type) in [LocalZonedTimestampType, 
RowType]:
-> 2316 raise ValueError("%s is not supported to be used as the element 
type of ArrayType." %
   2317  data_type.element_type)
   2318 return pa.list_(to_arrow_type(data_type.element_type))
   2319 elif isinstance(data_type, RowType):

ValueError: ROW is not supported to be used as the element type of ArrayType. 
{code}
when I tried to execute it with `TableEnvironment.sql_query()`, but works when 
I tried it with `TableEnvironment.execute_sql()`:
{code:java}
+++
| op |  value |
+++
| +I |   [(1, 2), (2, 2)] |
+++ {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-22 Thread Hao Li
Hi Timo and Jark,

I agree Oracle's syntax seems concise and more descriptive. For the
built-in `ML_PREDICT` and `ML_EVALUATE` functions I agree with Jark we can
support them as built-in PTF using `SqlTableFunction` for this FLIP. We can
have a different FLIP discussing user defined PTF and adopt that later for
model functions later. To summarize, the current proposed syntax is

SELECT f1, f2, label FROM TABLE(ML_PREDICT(TABLE `my_data`,
`classifier_model`, f1, f2))

SELECT * FROM TABLE(ML_EVALUATE(TABLE `eval_data`, `classifier_model`, f1,
f2))

Is `DESCRIPTOR` a must in the syntax? If so, it becomes

SELECT f1, f2, label FROM TABLE(ML_PREDICT(TABLE `my_data`,
`classifier_model`, DESCRIPTOR(f1), DESCRIPTOR(f2)))

SELECT * FROM TABLE(ML_EVALUATE(TABLE `eval_data`, `classifier_model`,
DESCRIPTOR(f1), DESCRIPTOR(f2)))

If Calcite supports dropping outer table keyword, it becomes

SELECT f1, f2, label FROM ML_PREDICT(TABLE `my_data`, `classifier_model`,
DESCRIPTOR(f1), DESCRIPTOR(f2))

SELECT * FROM ML_EVALUATE(TABLE `eval_data`, `classifier_model`, DESCRIPTOR(
f1), DESCRIPTOR(f2))

Thanks,
Hao



On Fri, Mar 22, 2024 at 9:16 AM Jark Wu  wrote:

> Sorry, I mean we can bump the Calcite version if needed in Flink 1.20.
>
> On Fri, 22 Mar 2024 at 22:19, Jark Wu  wrote:
>
> > Hi Timo,
> >
> > Introducing user-defined PTF is very useful in Flink, I'm +1 for this.
> > But I think the ML model FLIP is not blocked by this, because we
> > can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
> > just like TUMBLE/HOP. And support user-defined ML functions as
> > a future FLIP.
> >
> > Regarding the simplified PTF syntax which reduces the outer TABLE()
> > keyword,
> > it seems it was just supported[1] by the Calcite community last month and
> > will be
> > released in the next version (v1.37). The Calcite community is preparing
> > the
> > 1.37 release, so we can bump the version if needed in Flink 1.19.
> >
> > Best,
> > Jark
> >
> > [1]: https://issues.apache.org/jira/browse/CALCITE-6254
> >
> > On Fri, 22 Mar 2024 at 21:46, Timo Walther  wrote:
> >
> >> Hi everyone,
> >>
> >> this is a very important change to the Flink SQL syntax but we can't
> >> wait until the SQL standard is ready for this. So I'm +1 on introducing
> >> the MODEL concept as a first class citizen in Flink.
> >>
> >> For your information: Over the past months I have already spent a
> >> significant amount of time thinking about how we can introduce PTFs in
> >> Flink. I reserved FLIP-440[1] for this purpose and I will share a
> >> version of this in the next 1-2 weeks.
> >>
> >> For a good implementation of FLIP-440 and also FLIP-437, we should
> >> evolve the PTF syntax in collaboration with Apache Calcite.
> >>
> >> There are different syntax versions out there:
> >>
> >> 1) Flink
> >>
> >> SELECT * FROM
> >>TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
> >>
> >> 2) SQL standard
> >>
> >> SELECT * FROM
> >>TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10'
> MINUTES));
> >>
> >> 3) Oracle
> >>
> >> SELECT * FROM
> >> TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));
> >>
> >> As you can see above, Flink does not follow the standard correctly as it
> >> would need to use `TABLE()` but this is not provided by Calcite yet.
> >>
> >> I really like the Oracle syntax[2][3] a lot. It reduces necessary
> >> keywords to a minimum. Personally, I would like to discuss this syntax
> >> in a separate FLIP and hope I will find supporters for:
> >>
> >>
> >> SELECT * FROM
> >>TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
> >>
> >> If we go entirely with the Oracle syntax, as you can see in the example,
> >> Oracle allows for passing identifiers directly. This would solve our
> >> problems for the MODEL as well:
> >>
> >> SELECT f1, f2, label FROM ML_PREDICT(
> >>data => `my_data`,
> >>model => `classifier_model`,
> >>input => DESCRIPTOR(f1, f2));
> >>
> >> Or we completely adopt the Oracle syntax:
> >>
> >> SELECT f1, f2, label FROM ML_PREDICT(
> >>data => `my_data`,
> >>model => `classifier_model`,
> >>input => COLUMNS(f1, f2));
> >>
> >>
> >> What do you think?
> >>
> >> Happy to create a FLIP for just this syntax question and collaborate
> >> with the Calcite community on this. Supporting the syntax of Oracle
> >> shouldn't be too hard to convince at least as parser parameter.
> >>
> >> Regards,
> >> Timo
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
> >> [2]
> >>
> >>
> https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072
> >> [3]
> https://oracle-base.com/articles/18c/polymorphic-table-functions-18c
> >>
> >>
> >>
> >> On 20.03.24 17:22, Mingge Deng wrote:
> >> > Thanks Jark for all the insightful comments.
> >> >
> >> > We have updated the proposal per our offline discussions:
> >> > 1. Model 

[jira] [Created] (FLINK-34922) Exception History should support multiple Global failures

2024-03-22 Thread Panagiotis Garefalakis (Jira)
Panagiotis Garefalakis created FLINK-34922:
--

 Summary: Exception History should support multiple Global failures
 Key: FLINK-34922
 URL: https://issues.apache.org/jira/browse/FLINK-34922
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Panagiotis Garefalakis


Before source coordinators were introduced, global failures were rare and only 
triggered by the JM ensuring they only happened once per failure. Since this 
has changed now we should adjust accordingly and support multiple global 
failures as part of the exception history.

Relevant discussion under: 
https://github.com/apache/flink/pull/23440#pullrequestreview-1701775436



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-22 Thread Jark Wu
Sorry, I mean we can bump the Calcite version if needed in Flink 1.20.

On Fri, 22 Mar 2024 at 22:19, Jark Wu  wrote:

> Hi Timo,
>
> Introducing user-defined PTF is very useful in Flink, I'm +1 for this.
> But I think the ML model FLIP is not blocked by this, because we
> can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
> just like TUMBLE/HOP. And support user-defined ML functions as
> a future FLIP.
>
> Regarding the simplified PTF syntax which reduces the outer TABLE()
> keyword,
> it seems it was just supported[1] by the Calcite community last month and
> will be
> released in the next version (v1.37). The Calcite community is preparing
> the
> 1.37 release, so we can bump the version if needed in Flink 1.19.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/CALCITE-6254
>
> On Fri, 22 Mar 2024 at 21:46, Timo Walther  wrote:
>
>> Hi everyone,
>>
>> this is a very important change to the Flink SQL syntax but we can't
>> wait until the SQL standard is ready for this. So I'm +1 on introducing
>> the MODEL concept as a first class citizen in Flink.
>>
>> For your information: Over the past months I have already spent a
>> significant amount of time thinking about how we can introduce PTFs in
>> Flink. I reserved FLIP-440[1] for this purpose and I will share a
>> version of this in the next 1-2 weeks.
>>
>> For a good implementation of FLIP-440 and also FLIP-437, we should
>> evolve the PTF syntax in collaboration with Apache Calcite.
>>
>> There are different syntax versions out there:
>>
>> 1) Flink
>>
>> SELECT * FROM
>>TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>>
>> 2) SQL standard
>>
>> SELECT * FROM
>>TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>>
>> 3) Oracle
>>
>> SELECT * FROM
>> TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));
>>
>> As you can see above, Flink does not follow the standard correctly as it
>> would need to use `TABLE()` but this is not provided by Calcite yet.
>>
>> I really like the Oracle syntax[2][3] a lot. It reduces necessary
>> keywords to a minimum. Personally, I would like to discuss this syntax
>> in a separate FLIP and hope I will find supporters for:
>>
>>
>> SELECT * FROM
>>TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
>>
>> If we go entirely with the Oracle syntax, as you can see in the example,
>> Oracle allows for passing identifiers directly. This would solve our
>> problems for the MODEL as well:
>>
>> SELECT f1, f2, label FROM ML_PREDICT(
>>data => `my_data`,
>>model => `classifier_model`,
>>input => DESCRIPTOR(f1, f2));
>>
>> Or we completely adopt the Oracle syntax:
>>
>> SELECT f1, f2, label FROM ML_PREDICT(
>>data => `my_data`,
>>model => `classifier_model`,
>>input => COLUMNS(f1, f2));
>>
>>
>> What do you think?
>>
>> Happy to create a FLIP for just this syntax question and collaborate
>> with the Calcite community on this. Supporting the syntax of Oracle
>> shouldn't be too hard to convince at least as parser parameter.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
>> [2]
>>
>> https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072
>> [3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c
>>
>>
>>
>> On 20.03.24 17:22, Mingge Deng wrote:
>> > Thanks Jark for all the insightful comments.
>> >
>> > We have updated the proposal per our offline discussions:
>> > 1. Model will be treated as a new relation in FlinkSQL.
>> > 2. Include the common ML predict and evaluate functions into the open
>> > source flink to complete the user journey.
>> >  And we should be able to extend the calcite SqlTableFunction to
>> support
>> > these two ML functions.
>> >
>> > Best,
>> > Mingge
>> >
>> > On Mon, Mar 18, 2024 at 7:05 PM Jark Wu  wrote:
>> >
>> >> Hi Hao,
>> >>
>> >>> I meant how the table name
>> >> in window TVF gets translated to `SqlCallingBinding`. Probably we need
>> to
>> >> fetch the table definition from the catalog somewhere. Do we treat
>> those
>> >> window TVF specially in parser/planner so that catalog is looked up
>> when
>> >> they are seen?
>> >>
>> >> The table names are resolved and validated by Calcite SqlValidator.  We
>> >> don' need to fetch from catalog manually.
>> >> The specific checking logic of cumulate window happens in
>> >> SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
>> >> The return type of SqlCumulateTableFunction is defined in
>> >> #getRowTypeInference() method.
>> >> Both are public interfaces provided by Calcite and it seems it's not
>> >> specially handled in parser/planner.
>> >>
>> >> I didn't try that, but my gut feeling is that the framework is ready to
>> >> extend a customized TVF.
>> >>
>> >>> For what model is, I'm wondering if it has to be datatype or relation.
>> >> Can

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-22 Thread Ahmed Hamdy
Hi Ron,
Sorry for joining the discussion late, thanks for the effort.

I think the base idea is great, however I have a couple of comments:
- I want to iterate on Timo's comments regarding the confusion between
"Dynamic Table" and current Flink "Table". Should the refactoring of the
system happen in 2.0, should we rename it in this Flip ( as the suggestions
in the thread ) and address the holistic changes in a separate Flip for 2.0?
- I feel confused with how it is further with other components, the
examples provided feel like a standalone ETL job, could you provide in the
FLIP an example where the table is further used in subsequent queries
(specially in batch mode).
- I really like the standard of keeping the unified batch and streaming
approach
Best Regards
Ahmed Hamdy


On Fri, 22 Mar 2024 at 12:07, Lincoln Lee  wrote:

> Hi Timo,
>
> Thanks for your thoughtful inputs!
>
> Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function,
> but our primary concern is that by using a view, we might limit future
> opportunities
> to optimize queries through automatic materialization rewriting [1],
> leveraging
> the support for MV by physical storage. This is because we would be
> breaking
> the intuitive semantics of a materialized view (a materialized view
> represents
> the result of a query) by allowing data modifications, thus losing the
> potential
> for such optimizations.
>
> With these considerations in mind, we were inspired by Google Looker's
> Persistent
> Derived Table [2]. PDT is designed for building Looker's automated
> modeling,
> aligning with our purpose for the stream-batch automatic pipeline.
> Therefore,
> we are considering another candidate, Derived Table, the term 'derive'
> suggests a
> query, and 'table' retains modifiability. This approach would not disrupt
> our current
> concept of a dynamic table, preserving the future utility of MVs.
>
> Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
> introducing
>  a new concept Derived Table for this FLIP, this makes all concepts to play
> together nicely.
>
> What do you think about this?
>
> [1] https://calcite.apache.org/docs/materialized_views.html
> [2]
>
> https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables
>
>
> Best,
> Lincoln Lee
>
>
> Timo Walther  于2024年3月22日周五 17:54写道:
>
> > Hi Ron,
> >
> > thanks for the detailed answer. Sorry, for my late reply, we had a
> > conference that kept me busy.
> >
> >  > In the current concept[1], it actually includes: Dynamic Tables &
> >  > & Continuous Query. Dynamic Table is just an abstract logical concept
> >
> > This explanation makes sense to me. But the docs also say "A continuous
> > query is evaluated on the dynamic table yielding a new dynamic table.".
> > So even our regular CREATE TABLEs are considered dynamic tables. This
> > can also be seen in the diagram "Dynamic Table -> Continuous Query ->
> > Dynamic Table". Currently, Flink queries can only be executed on Dynamic
> > Tables.
> >
> >  > In essence, a materialized view represents the result of a query.
> >
> > Isn't that what your proposal does as well?
> >
> >  > the object of the suspend operation is the refresh task of the
> > dynamic table
> >
> > I understand that Snowflake uses the term [1] to merge their concepts of
> > STREAM, TASK, and TABLE into one piece of concept. But Flink has no
> > concept of a "refresh task". Also, they already introduced MATERIALIZED
> > VIEW. Flink is in the convenient position that the concept of
> > materialized views is not taken (reserved maybe for exactly this use
> > case?). And SQL standard concept could be "slightly adapted" to our
> > needs. Looking at other vendors like Postgres[2], they also use
> > `REFRESH` commands so why not adding additional commands such as DELETE
> > or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the database
> > to use an existing table segment"[3] which comes closer to what we want
> > as well.
> >
> >  > it is not intended to support data modification
> >
> > This is an argument that I understand. But we as Flink could allow data
> > modifications. This way we are only extending the standard and don't
> > introduce new concepts.
> >
> > If we can't agree on using MATERIALIZED VIEW concept. We should fix our
> > syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic
> > tables unbounded. We would be closer to the SQL standard with this and
> > pave the way for the future. I would actually support this if all
> > concepts play together nicely.
> >
> >  > In the future, we can consider extending the statement set syntax to
> > support the creation of multiple dynamic tables.
> >
> > It's good that we called the concept STATEMENT SET. This allows us to
> > defined CREATE TABLE within. Even if it might look a bit confusing.
> >
> > Regards,
> > Timo
> >
> > [1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> > [2]
> > 

Re: Flink Kubernetes Operator Failing Over FlinkDeployments to a New Cluster

2024-03-22 Thread Kevin Lam
Thanks for sharing this work Gyula! That's great to see the FLIP covers
some of the limitations already. I will follow the FLIP and associated JIRA
ticket.

Hi Matthias Pohl. I'd be interested to learn if there has been any progress
on the FLIP-360 or associated JIRA issue FLINK-31709.

On Fri, Mar 22, 2024 at 3:47 AM Gyula Fóra  wrote:

> I agree, we would need some FLIPs to cover this. Actually there is already
> some work on this topic initiated by Matthias Pohl (ccd).
> Please see this:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore
>
> This FLIP actually covers some of these limitations already and other
> outstanding issues in the operator.
>
> Cheers,
> Gyula
>


Re: [DISCUSS] FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

2024-03-22 Thread Zakelly Lan
Hi Yue,

Thanks for bringing this up!

The CURRENT FILE is the special one, which should be snapshot during the
sync phase (temporary load into memory). Thus we can solve this.


Best,
Zakelly

On Fri, Mar 22, 2024 at 4:55 PM yue ma  wrote:

> Hi jinzhong,
> Thanks for you reply. I still have some doubts about the first question. Is
> there such a case
> When you made a snapshot during the synchronization phase, you recorded the
> current and manifest 8, but before asynchronous phase, the manifest reached
> the size threshold and then the CURRENT FILE pointed to the new manifest 9,
> and then uploaded the incorrect CURRENT file ?
>
> Jinzhong Li  于2024年3月20日周三 20:13写道:
>
> > Hi Yue,
> >
> > Thanks for your feedback!
> >
> > > 1. If we choose Option-3 for ForSt , how would we handle Manifest File
> > > ? Should we take a snapshot of the Manifest during the synchronization
> > phase?
> >
> > IIUC, the GetLiveFiles() API in Option-3 can also catch the fileInfo of
> > Manifest files, and this api also return the manifest file size, which
> > means this api could take snapshot for Manifest FileInfo (filename +
> > fileSize) during the synchronization phase.
> > You could refer to the rocksdb source code[1] to verify this.
> >
> >
> >  > However, many distributed storage systems do not support the
> > > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > > directly read and write remote files. Can we not copy or Fast duplicate
> > > these files, but instand of directly reuse and. reference these remote
> > > files? I think this can reduce file download time and may be more
> useful
> > > for most users who use HDFS (do not support Fast Duplicate)?
> >
> > Firstly, as far as I know, most remote file systems support the
> > FastDuplicate, eg. S3 copyObject/Azure Blob Storage copyBlob/OSS
> > copyObject, and the HDFS indeed does not support FastDuplicate.
> >
> > Actually,we have considered the design which reuses remote files. And
> that
> > is what we want to implement in the coming future, where both checkpoints
> > and restores can reuse existing files residing on the remote state
> storage.
> > However, this design conflicts with the current file management system in
> > Flink.  At present, remote state files are managed by the ForStDB
> > (TaskManager side), while checkpoint files are managed by the JobManager,
> > which is a major hindrance to file reuse. For example, issues could arise
> > if a TM reuses a checkpoint file that is subsequently deleted by the JM.
> > Therefore, as mentioned in FLIP-423[2], our roadmap is to first integrate
> > checkpoint/restore mechanisms with existing framework  at milestone-1.
> > Then, at milestone-2, we plan to introduce TM State Ownership and Faster
> > Checkpointing mechanisms, which will allow both checkpointing and
> restoring
> > to directly reuse remote files, thus achieving faster checkpointing and
> > restoring.
> >
> > [1]
> >
> >
> https://github.com/facebook/rocksdb/blob/6ddfa5f06140c8d0726b561e16dc6894138bcfa0/db/db_filesnapshot.cc#L77
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-RoadMap+LaunchingPlan
> >
> > Best,
> > Jinzhong
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Mar 20, 2024 at 4:01 PM yue ma  wrote:
> >
> > > Hi Jinzhong
> > >
> > > Thank you for initiating this FLIP.
> > >
> > > I have just some minor question:
> > >
> > > 1. If we choice Option-3 for ForSt , how would we handle Manifest File
> > > ? Should we take snapshot of the Manifest during the synchronization
> > phase?
> > > Otherwise, may the Manifest and MetaInfo information be inconsistent
> > during
> > > recovery?
> > > 2. For the Restore Operation , we need Fast Duplicate  Checkpoint Files
> > to
> > > Working Dir . However, many distributed storage systems do not support
> > the
> > > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > > directly read and write remote files. Can we not copy or Fast duplicate
> > > these files, but instand of directly reuse and. reference these remote
> > > files? I think this can reduce file download time and may be more
> useful
> > > for most users who use HDFS (do not support Fast Duplicate)?
> > >
> > > --
> > > Best,
> > > Yue
> > >
> >
>
>
> --
> Best,
> Yue
>


Re: [DISCUSS] FLIP-425: Asynchronous Execution Model

2024-03-22 Thread lorenzo . affetti
Thank you Yanfei for addressing all the questions!

> I'm not sure if I understand your question. In Java, this
case(modifying the local local variable) is not allowed[1], but there
are ways to get around the limitation of lambda.
In this case, the modification to x may be concurrent, which needs to
be handled carefully.

I think you got my question, and I did not realize that is not even allowed to 
modify some externally scoped variable in a lambda.
I guess the point is that it is possible, but the user would really need to be 
willing to do it and "shoot him/herself in the foot".

> an implicit fact in sync
API is that "event timer fire" would execute before "the subsequent
records of watermark", but in out-of-order mode(async API), the
execution order between them is not guaranteed

Got it, what I don't get exactly is what type of inconsistency/issue the user 
could face.
For example, If the user now would not be sure that elements end up being in 
correct windows, I am afraid this would somewhat simply hinder the watermark 
concept as a whole. What do you think?

Thank you.

On Mar 21, 2024 at 14:27 +0100, Yanfei Lei , wrote:
> Thanks for your reading and valuable comments!
>
> > 1) About locking VS reference counting: I would like to clear out which 
> > mechanism prevents what:
> The `KeyAccountingUnit` implements locking behavior on keys and
> ensures 2 state requests on the same key happen in order.
> Double-locking the same key does not result in deadlocks (thanks to
> the `previous == record` condition in your pseudo-code), so, the same
> callback chain can update/read multiple times the same piece of state.
> On the other side we have the reference counting mechanism that is
> used to understand whether a record has been fully processed, i.e.,
> all state invocations have been carried out.
> Here is the question: am I correct if we say that key accounting is
> needed for out-of-order while reference counting is needed for
> checkpointing and watermarking?
>
>
> Regarding the "deadlock" of `KeyAccountingUnit`: good catch, we will
> emphasize this in FLIP, the KeyAccountingUnitis reentrant, so the
> state requests of the same record can update/read multiple times
> without deadlock.
>
> Regarding the question: records, checkpoint barriers and watermarks
> can be regarded as inputs, this FLIP discusses the *order* between all
> inputs, in simple terms, the inputs of the same key that arrive first
> need to be executed first.
>
> And the `KeyAccountingUnit` and reference counting work together to
> preserve the order, when the reference counting mechanism recognizes a
> record has been fully processed, the record will be removed from the
> `KeyAccountingUnit`. The checkpoint or watermark would start util all
> the reference counting of arrived inputs reach zero.
>
>
> > 2) Number of mails:
> Do you end up having two mails?
>
>
> Yes, there are two mails in this case.
>
> > 3) Would this change something on the consistency guarantees provided?
> I guess not, as, the lock is held in any case until the value on the
> state hasn't been updated.
> Could lead to any inconsistency (most probably the state would be updated to 
> 0).
>
>
> Yes, the results of the two cases you mentioned are as you analyzed.
> The result of the first case is 1, and the result of the second case
> is 0.
> No matter which case it is, the next `processElement` with the same
> key will be executed after the code in this `processElement` is
> completely executed.
>
> Therefore it wouldn't lead to inconsistency.
>
> > 4) On the local variables/attributes:
>
> I'm not sure if I understand your question. In Java, this
> case(modifying the local local variable) is not allowed[1], but there
> are ways to get around the limitation of lambda.
> In this case, the modification to x may be concurrent, which needs to
> be handled carefully.
>
> 5) On watermarks:
> It seems that, in order to achieve a good throughput, out-of-order
> mode should be used.
> In the FLIP I could not understand well how many things could go wrong
> if that one is used.
> Could you please clarify that?
>
> A typical example is the order between "event timer fire" and "the
> subsequent records of watermark".
> Although the semantics of watermarks do not define the sequence
> between a watermark and subsequent records, an implicit fact in sync
> API is that "event timer fire" would execute before "the subsequent
> records of watermark", but in out-of-order mode(async API), the
> execution order between them is not guaranteed.
> There also are some related discussions in FLIP-423[2,3] proposed by
> Yunfeng Zhou and Xintong Song.
>
> [1] 
> https://stackoverflow.com/questions/30026824/modifying-local-variable-from-inside-lambda
> [2] https://lists.apache.org/thread/djsnybs9whzrt137z3qmxdwn031o93gn
> [3] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz
>
>  于2024年3月21日周四 19:29写道:
> >
> > Thank you everybody for the questions and answers (especially 

[jira] [Created] (FLINK-34921) SystemProcessingTimeServiceTest fails due to missing output

2024-03-22 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34921:
-

 Summary: SystemProcessingTimeServiceTest fails due to missing 
output
 Key: FLINK-34921
 URL: https://issues.apache.org/jira/browse/FLINK-34921
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.20.0
Reporter: Matthias Pohl


This PR CI build with {{AdaptiveScheduler}} enabled failed:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58476=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=11224

{code}
"ForkJoinPool-61-worker-25" #863 daemon prio=5 os_prio=0 tid=0x7f8c19eba000 
nid=0x60a5 waiting on condition [0x7f8bc2cf9000]
Mar 21 17:19:42java.lang.Thread.State: WAITING (parking)
Mar 21 17:19:42 at sun.misc.Unsafe.park(Native Method)
Mar 21 17:19:42 - parking to wait for  <0xd81959b8> (a 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask)
Mar 21 17:19:42 at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
Mar 21 17:19:42 at 
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
Mar 21 17:19:42 at 
java.util.concurrent.FutureTask.get(FutureTask.java:191)
Mar 21 17:19:42 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest$$Lambda$1443/1477662666.call(Unknown
 Source)
Mar 21 17:19:42 at 
org.assertj.core.api.ThrowableAssert.catchThrowable(ThrowableAssert.java:63)
Mar 21 17:19:42 at 
org.assertj.core.api.AssertionsForClassTypes.catchThrowable(AssertionsForClassTypes.java:892)
Mar 21 17:19:42 at 
org.assertj.core.api.Assertions.catchThrowable(Assertions.java:1366)
Mar 21 17:19:42 at 
org.assertj.core.api.Assertions.assertThatThrownBy(Assertions.java:1210)
Mar 21 17:19:42 at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture(SystemProcessingTimeServiceTest.java:92)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-22 Thread wudi
Hi, Feng,

1. Are you suggesting that when a commit gets stuck, we can interrupt the 
commit request using a timeout parameter? Currently, there is no such 
parameter. In my understanding, in a two-phase commit, checkpoint must be 
enabled, so the commit timeout is essentially the checkpoint timeout. 
Therefore, it seems unnecessary to add an additional parameter here. What do 
you think?

2. In addition to deleting checkpoints to re-consume data again, the Connector 
also provides an option to ignore commit errors[1]. However, this option is 
only used for error recovery scenarios, such as when a transaction is cleared 
by the server but you want to reuse the upstream offset from the checkpoint.

3. Also, thank you for pointing out the issue with the parameter. It has 
already been addressed[2], but the FLIP changes were overlooked. It has been 
updated.

[1] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java#L150-L160
[2] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java#L89-L98

Brs
di.wu



> 2024年3月22日 18:28,Feng Jin  写道:
> 
> Hi Di,
> 
> Thank you for the update, as well as quickly implementing corresponding
> capabilities including filter push down and project push down.
> 
> Regarding the transaction timeout, I still have some doubts. I would like
> to confirm if we can control this timeout parameter in the connector, such
> as setting it to 10 minutes or 1 hour.
> Also, when a transaction is cleared by the server, the commit operation of
> the connector will fail, leading to job failure. In this case, can users
> only choose to delete the checkpoint and re-consume historical data?
> 
> There is also a small question regarding the parameters*: *
> *doris.request.connect.timeout.ms *
> and d*oris.request.read.timeout.ms *,
> can we change them to Duration type and remove the "ms" suffix.?
> This way, all time parameters can be kept uniform in type as duration.
> 
> 
> Best,
> Feng
> 
> On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:
> 
>> Hi, Feng,
>> Thank you, that's a great suggestion !
>> 
>> I have already implemented FilterPushDown and removed that parameter on
>> DorisDynamicTableSource[1], and also updated FLIP.
>> 
>> Regarding the mention of [Doris also aborts transactions], it may not have
>> been described accurately. It mainly refers to the automatic expiration of
>> long-running transactions in Doris that have not been committed for a
>> prolonged period.
>> 
>> As for two-phase commit, when a commit fails, the checkpoint will also
>> fail, and the job will be continuously retried.
>> 
>> [1]
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
>> 
>> Brs
>> di.wu
>> 
>> 
>>> 2024年3月15日 14:53,Feng Jin  写道:
>>> 
>>> Hi Di
>>> 
>>> Thank you for initiating this FLIP, +1 for this.
>>> 
>>> Regarding the option `doris.filter.query` of doris source table
>>> 
>>> Can we directly implement the FilterPushDown capability of Flink Source
>>> like Jdbc Source [1] instead of introducing an option?
>>> 
>>> 
>>> Regarding two-phase commit,
>>> 
 At the same time, Doris will also abort transactions that have not been
>>> committed for a long time
>>> 
>>> Can we control the transaction timeout in the connector?
>>> And control the behavior when timeout occurs, whether to discard by
>> default
>>> or trigger job failure?
>>> 
>>> 
>>> [1]. https://issues.apache.org/jira/browse/FLINK-16024
>>> 
>>> Best,
>>> Feng
>>> 
>>> 
>>> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky >> 
>>> wrote:
>>> 
 Hi,
 
 Thanks for driving this, +1 for the FLIP.
 
 Best,
 Ferenc
 
 
 
 
 On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy >> 
 wrote:
 
> 
> 
> Hello,
> Thanks for the proposal, +1 for the FLIP.
> 
> Best Regards
> Ahmed Hamdy
> 
> 
> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
> 
>> Hi, Leonard
>> Thank you for your suggestion.
>> I referred to other Connectors[1], modified the naming and types of
>> relevant parameters[2], and also updated FLIP.
>> 
>> [1]
>> 
 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
>> [1]
>> 
 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
>> 
>> Brs,
>> di.wu
>> 
>>> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
>>> 
>>> Thanks wudi for the updating, the FLIP generally looks good to me, I
>>> only left two minor suggestions:

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-22 Thread Jark Wu
Hi Timo,

Introducing user-defined PTF is very useful in Flink, I'm +1 for this.
But I think the ML model FLIP is not blocked by this, because we
can introduce ML_PREDICT and ML_EVALUATE as built-in PTFs
just like TUMBLE/HOP. And support user-defined ML functions as
a future FLIP.

Regarding the simplified PTF syntax which reduces the outer TABLE()
keyword,
it seems it was just supported[1] by the Calcite community last month and
will be
released in the next version (v1.37). The Calcite community is preparing
the
1.37 release, so we can bump the version if needed in Flink 1.19.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/CALCITE-6254

On Fri, 22 Mar 2024 at 21:46, Timo Walther  wrote:

> Hi everyone,
>
> this is a very important change to the Flink SQL syntax but we can't
> wait until the SQL standard is ready for this. So I'm +1 on introducing
> the MODEL concept as a first class citizen in Flink.
>
> For your information: Over the past months I have already spent a
> significant amount of time thinking about how we can introduce PTFs in
> Flink. I reserved FLIP-440[1] for this purpose and I will share a
> version of this in the next 1-2 weeks.
>
> For a good implementation of FLIP-440 and also FLIP-437, we should
> evolve the PTF syntax in collaboration with Apache Calcite.
>
> There are different syntax versions out there:
>
> 1) Flink
>
> SELECT * FROM
>TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>
> 2) SQL standard
>
> SELECT * FROM
>TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
>
> 3) Oracle
>
> SELECT * FROM
> TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));
>
> As you can see above, Flink does not follow the standard correctly as it
> would need to use `TABLE()` but this is not provided by Calcite yet.
>
> I really like the Oracle syntax[2][3] a lot. It reduces necessary
> keywords to a minimum. Personally, I would like to discuss this syntax
> in a separate FLIP and hope I will find supporters for:
>
>
> SELECT * FROM
>TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
>
> If we go entirely with the Oracle syntax, as you can see in the example,
> Oracle allows for passing identifiers directly. This would solve our
> problems for the MODEL as well:
>
> SELECT f1, f2, label FROM ML_PREDICT(
>data => `my_data`,
>model => `classifier_model`,
>input => DESCRIPTOR(f1, f2));
>
> Or we completely adopt the Oracle syntax:
>
> SELECT f1, f2, label FROM ML_PREDICT(
>data => `my_data`,
>model => `classifier_model`,
>input => COLUMNS(f1, f2));
>
>
> What do you think?
>
> Happy to create a FLIP for just this syntax question and collaborate
> with the Calcite community on this. Supporting the syntax of Oracle
> shouldn't be too hard to convince at least as parser parameter.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
> [2]
>
> https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072
> [3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c
>
>
>
> On 20.03.24 17:22, Mingge Deng wrote:
> > Thanks Jark for all the insightful comments.
> >
> > We have updated the proposal per our offline discussions:
> > 1. Model will be treated as a new relation in FlinkSQL.
> > 2. Include the common ML predict and evaluate functions into the open
> > source flink to complete the user journey.
> >  And we should be able to extend the calcite SqlTableFunction to
> support
> > these two ML functions.
> >
> > Best,
> > Mingge
> >
> > On Mon, Mar 18, 2024 at 7:05 PM Jark Wu  wrote:
> >
> >> Hi Hao,
> >>
> >>> I meant how the table name
> >> in window TVF gets translated to `SqlCallingBinding`. Probably we need
> to
> >> fetch the table definition from the catalog somewhere. Do we treat those
> >> window TVF specially in parser/planner so that catalog is looked up when
> >> they are seen?
> >>
> >> The table names are resolved and validated by Calcite SqlValidator.  We
> >> don' need to fetch from catalog manually.
> >> The specific checking logic of cumulate window happens in
> >> SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
> >> The return type of SqlCumulateTableFunction is defined in
> >> #getRowTypeInference() method.
> >> Both are public interfaces provided by Calcite and it seems it's not
> >> specially handled in parser/planner.
> >>
> >> I didn't try that, but my gut feeling is that the framework is ready to
> >> extend a customized TVF.
> >>
> >>> For what model is, I'm wondering if it has to be datatype or relation.
> >> Can
> >> it be another kind of citizen parallel to datatype/relation/function/db?
> >> Redshift also supports `show models` operation, so it seems it's treated
> >> specially as well?
> >>
> >> If it is an entity only used in catalog scope (e.g., show xxx, create
> xxx,
> >> drop 

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-22 Thread Timo Walther

Hi everyone,

this is a very important change to the Flink SQL syntax but we can't 
wait until the SQL standard is ready for this. So I'm +1 on introducing 
the MODEL concept as a first class citizen in Flink.


For your information: Over the past months I have already spent a 
significant amount of time thinking about how we can introduce PTFs in 
Flink. I reserved FLIP-440[1] for this purpose and I will share a 
version of this in the next 1-2 weeks.


For a good implementation of FLIP-440 and also FLIP-437, we should 
evolve the PTF syntax in collaboration with Apache Calcite.


There are different syntax versions out there:

1) Flink

SELECT * FROM
  TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

2) SQL standard

SELECT * FROM
  TABLE(TUMBLE(TABLE(Bid), DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));

3) Oracle

SELECT * FROM
   TUMBLE(Bid, COLUMNS(bidtime), INTERVAL '10' MINUTES));

As you can see above, Flink does not follow the standard correctly as it 
would need to use `TABLE()` but this is not provided by Calcite yet.


I really like the Oracle syntax[2][3] a lot. It reduces necessary 
keywords to a minimum. Personally, I would like to discuss this syntax 
in a separate FLIP and hope I will find supporters for:



SELECT * FROM
  TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);

If we go entirely with the Oracle syntax, as you can see in the example, 
Oracle allows for passing identifiers directly. This would solve our 
problems for the MODEL as well:


SELECT f1, f2, label FROM ML_PREDICT(
  data => `my_data`,
  model => `classifier_model`,
  input => DESCRIPTOR(f1, f2));

Or we completely adopt the Oracle syntax:

SELECT f1, f2, label FROM ML_PREDICT(
  data => `my_data`,
  model => `classifier_model`,
  input => COLUMNS(f1, f2));


What do you think?

Happy to create a FLIP for just this syntax question and collaborate 
with the Calcite community on this. Supporting the syntax of Oracle 
shouldn't be too hard to convince at least as parser parameter.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5D+FLIP-440%3A+User-defined+Polymorphic+Table+Functions
[2] 
https://docs.oracle.com/en/database/oracle/oracle-database/19/arpls/DBMS_TF.html#GUID-0F66E239-DE77-4C0E-AC76-D5B632AB8072

[3] https://oracle-base.com/articles/18c/polymorphic-table-functions-18c



On 20.03.24 17:22, Mingge Deng wrote:

Thanks Jark for all the insightful comments.

We have updated the proposal per our offline discussions:
1. Model will be treated as a new relation in FlinkSQL.
2. Include the common ML predict and evaluate functions into the open
source flink to complete the user journey.
 And we should be able to extend the calcite SqlTableFunction to support
these two ML functions.

Best,
Mingge

On Mon, Mar 18, 2024 at 7:05 PM Jark Wu  wrote:


Hi Hao,


I meant how the table name

in window TVF gets translated to `SqlCallingBinding`. Probably we need to
fetch the table definition from the catalog somewhere. Do we treat those
window TVF specially in parser/planner so that catalog is looked up when
they are seen?

The table names are resolved and validated by Calcite SqlValidator.  We
don' need to fetch from catalog manually.
The specific checking logic of cumulate window happens in
SqlCumulateTableFunction.OperandMetadataImpl#checkOperandTypes.
The return type of SqlCumulateTableFunction is defined in
#getRowTypeInference() method.
Both are public interfaces provided by Calcite and it seems it's not
specially handled in parser/planner.

I didn't try that, but my gut feeling is that the framework is ready to
extend a customized TVF.


For what model is, I'm wondering if it has to be datatype or relation.

Can
it be another kind of citizen parallel to datatype/relation/function/db?
Redshift also supports `show models` operation, so it seems it's treated
specially as well?

If it is an entity only used in catalog scope (e.g., show xxx, create xxx,
drop xxx), it is fine to introduce it.
We have introduced such one before, called Module: "load module", "show
modules" [1].
But if we want to use Model in TVF parameters, it means it has to be a
relation or datatype, because
that is what it only accepts now.

Thanks for sharing the reason of preferring TVF instead of Redshift way. It
sounds reasonable to me.

Best,
Jark

  [1]:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/

On Fri, 15 Mar 2024 at 13:41, Hao Li  wrote:


Hi Jark,

Thanks for the pointer. Sorry for the confusion: I meant how the table

name

in window TVF gets translated to `SqlCallingBinding`. Probably we need to
fetch the table definition from the catalog somewhere. Do we treat those
window TVF specially in parser/planner so that catalog is looked up when
they are seen?

For what model is, I'm wondering if it has to be datatype or relation.

Can

it be another kind of citizen parallel to datatype/relation/function/db?
Redshift also supports `show models` 

[jira] [Created] (FLINK-34920) ZooKeeperLeaderRetrievalConnectionHandlingTest

2024-03-22 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-34920:
---

 Summary: ZooKeeperLeaderRetrievalConnectionHandlingTest 
 Key: FLINK-34920
 URL: https://issues.apache.org/jira/browse/FLINK-34920
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.1
Reporter: Ryan Skraba


[https://github.com/apache/flink/actions/runs/8384423618/job/22961979482#step:10:8939]
{code:java}
[ERROR] Process Exit Code: 2
[ERROR] Crashed tests:
[ERROR] 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalConnectionHandlingTest
[ERROR] at 
org.apache.maven.plugin.surefire.booterclient.ForkStarter.awaitResultsDone(ForkStarter.java:456)
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-XXX Apicurio-avro format

2024-03-22 Thread Jeyhun Karimov
Hi David,

Thanks a lot for clarification.
Sounds good to me.

Regards,
Jeyhun

On Fri, Mar 22, 2024 at 10:54 AM David Radley 
wrote:

> Hi Jeyhun,
> Thanks for your feedback.
>
> So for outbound messages, the message includes the global ID. We register
> the schema and match on the artifact id. So if the schema then evolved,
> adding a new  version, the global ID would still be unique and the same
> version would be targeted. If you wanted to change the Flink table
> definition in line with a higher version, then you could do this – the
> artifact id would need to match for it to use the same schema and a higher
> artifact version would need to be provided. I notice that Apicurio has
> rules around compatibility that you can configure, I suppose if we attempt
> to create an artifact that breaks these rules , then the register schema
> will fail and the associated operation should fail (e.g. an insert). I have
> not tried this.
>
>
> For inbound messages, using the global id in the header – this targets one
> version of the schema. I can create different messages on the topic built
> with different schema versions, and I can create different tables in Flink,
> as long as the reader and writer schemas are compatible as per the
> https://github.com/apache/flink/blob/779459168c46b7b4c600ef52f99a5435f81b9048/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L109
> Then this should work.
>
> Does this address your question?
> Kind regards, David.
>
>
> From: Jeyhun Karimov 
> Date: Thursday, 21 March 2024 at 21:06
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
> Hi David,
>
> Thanks for the FLIP. +1 for it.
> I have a minor comment.
>
> Can you please elaborate more on mechanisms in place to ensure data
> consistency and integrity, particularly in the event of schema conflicts?
> Since each message includes a schema ID for inbound and outbound messages,
> can you elaborate more on message consistency in the context of schema
> evolution?
>
> Regards,
> Jeyhun
>
>
>
>
>
> On Wed, Mar 20, 2024 at 4:34 PM David Radley  wrote:
>
> > Thank you very much for your feedback Mark. I have made the changes in
> the
> > latest google document. On reflection I agree with you that the
> > globalIdPlacement format configuration should apply to the
> deserialization
> > as well, so it is declarative. I am also going to have a new
> configuration
> > option to work with content IDs as well as global IDs. In line with the
> > deser Apicurio IdHandler and headerHandlers.
> >
> >  kind regards, David.
> >
> >
> > On 2024/03/20 15:18:37 Mark Nuttall wrote:
> > > +1 to this
> > >
> > > A few small comments:
> > >
> > > Currently, if users have Avro schemas in an Apicurio Registry (an open
> > source Apache 2 licensed schema registry), then the natural way to work
> > with those Avro flows is to use the schemas in the Apicurio Repository.
> > > 'those Avro flows' ... this is the first reference to flows.
> > >
> > > The new format will use the global Id to look up the Avro schema that
> > the message was written during deserialization.
> > > I get the point, phrasing is awkward. Probably you're more interested
> in
> > content than word polish at this point though.
> > >
> > > The Avro Schema Registry (apicurio-avro) format
> > > The Confluent format is called avro-confluent; this should be
> > avro-apicurio
> > >
> > > How to create tables with Apicurio-avro format
> > > s/Apicurio-avro/avro-apicurio/g
> > >
> > > HEADER – globalId is put in the header
> > > LEGACY– global Id is put in the message as a long
> > > CONFLUENT - globalId is put in the message as an int.
> > > Please could we specify 'four-byte int' and 'eight-byte long' ?
> > >
> > > For a Kafka source the globalId will be looked for in this order:
> > > - In the header
> > > - After a magic byte as an int
> > > - After a magic byte as a long.
> > > but apicurio-avro.globalid-placement has a default value of HEADER :
> why
> > do we have a search order as well? Isn't apicurio-avro.globalid-placement
> > enough? Don't the two mechanisms conflict?
> > >
> > > In addition to the types listed there, Flink supports reading/writing
> > nullable types. Flink maps nullable types to Avro union(something, null),
> > where something is the Avro type converted from Flink type.
> > > Is that definitely the right way round? I know we've had multiple
> > conversations about how unions work with Flink
> > >
> > >  This is because the writer schema is expanded, but this could not
> > complete if there are circularities.
> > > I understand your meaning but the sentence is awkward.
> > >
> > > The registered schema will be created or if it exists be updated.
> > > same again
> > >
> > > At some stage the lowest Flink level supported by the Kafka connector
> > will contain the additionalProperties methods in code flink.
> > > wording
> > >
> > > There existing Kafka 

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-22 Thread Lincoln Lee
Hi Timo,

Thanks for your thoughtful inputs!

Yes, expanding the MATERIALIZED VIEW(MV) could achieve the same function,
but our primary concern is that by using a view, we might limit future
opportunities
to optimize queries through automatic materialization rewriting [1],
leveraging
the support for MV by physical storage. This is because we would be breaking
the intuitive semantics of a materialized view (a materialized view
represents
the result of a query) by allowing data modifications, thus losing the
potential
for such optimizations.

With these considerations in mind, we were inspired by Google Looker's
Persistent
Derived Table [2]. PDT is designed for building Looker's automated
modeling,
aligning with our purpose for the stream-batch automatic pipeline.
Therefore,
we are considering another candidate, Derived Table, the term 'derive'
suggests a
query, and 'table' retains modifiability. This approach would not disrupt
our current
concept of a dynamic table, preserving the future utility of MVs.

Conceptually, a Derived Table is a Dynamic Table + Continuous Query. By
introducing
 a new concept Derived Table for this FLIP, this makes all concepts to play
together nicely.

What do you think about this?

[1] https://calcite.apache.org/docs/materialized_views.html
[2]
https://cloud.google.com/looker/docs/derived-tables#persistent_derived_tables


Best,
Lincoln Lee


Timo Walther  于2024年3月22日周五 17:54写道:

> Hi Ron,
>
> thanks for the detailed answer. Sorry, for my late reply, we had a
> conference that kept me busy.
>
>  > In the current concept[1], it actually includes: Dynamic Tables &
>  > & Continuous Query. Dynamic Table is just an abstract logical concept
>
> This explanation makes sense to me. But the docs also say "A continuous
> query is evaluated on the dynamic table yielding a new dynamic table.".
> So even our regular CREATE TABLEs are considered dynamic tables. This
> can also be seen in the diagram "Dynamic Table -> Continuous Query ->
> Dynamic Table". Currently, Flink queries can only be executed on Dynamic
> Tables.
>
>  > In essence, a materialized view represents the result of a query.
>
> Isn't that what your proposal does as well?
>
>  > the object of the suspend operation is the refresh task of the
> dynamic table
>
> I understand that Snowflake uses the term [1] to merge their concepts of
> STREAM, TASK, and TABLE into one piece of concept. But Flink has no
> concept of a "refresh task". Also, they already introduced MATERIALIZED
> VIEW. Flink is in the convenient position that the concept of
> materialized views is not taken (reserved maybe for exactly this use
> case?). And SQL standard concept could be "slightly adapted" to our
> needs. Looking at other vendors like Postgres[2], they also use
> `REFRESH` commands so why not adding additional commands such as DELETE
> or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the database
> to use an existing table segment"[3] which comes closer to what we want
> as well.
>
>  > it is not intended to support data modification
>
> This is an argument that I understand. But we as Flink could allow data
> modifications. This way we are only extending the standard and don't
> introduce new concepts.
>
> If we can't agree on using MATERIALIZED VIEW concept. We should fix our
> syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic
> tables unbounded. We would be closer to the SQL standard with this and
> pave the way for the future. I would actually support this if all
> concepts play together nicely.
>
>  > In the future, we can consider extending the statement set syntax to
> support the creation of multiple dynamic tables.
>
> It's good that we called the concept STATEMENT SET. This allows us to
> defined CREATE TABLE within. Even if it might look a bit confusing.
>
> Regards,
> Timo
>
> [1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
> [2]
> https://www.postgresql.org/docs/current/sql-creatematerializedview.html
> [3] https://oracle-base.com/articles/misc/materialized-views
>
> On 21.03.24 04:14, Feng Jin wrote:
> > Hi Ron and Lincoln
> >
> > Thanks for driving this discussion.  I believe it will greatly improve
> the
> > convenience of managing user real-time pipelines.
> >
> > I have some questions.
> >
> > *Regarding Limitations of Dynamic Table:*
> >
> >> Does not support modifying the select statement after the dynamic table
> > is created.
> >
> > Although currently we restrict users from modifying the query, I wonder
> if
> > we can provide a better way to help users rebuild it without affecting
> > downstream OLAP queries.
> >
> >
> > *Regarding the management of background jobs:*
> >
> > 1. From the documentation, the definitions SQL and job information are
> > stored in the Catalog. Does this mean that if a system needs to adapt to
> > Dynamic Tables, it also needs to store Flink's job information in the
> > corresponding system?
> > For example, does MySQL's Catalog need to store 

[jira] [Created] (FLINK-34919) WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs fails starting REST server

2024-03-22 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-34919:
---

 Summary: WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs 
fails starting REST server
 Key: FLINK-34919
 URL: https://issues.apache.org/jira/browse/FLINK-34919
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Ryan Skraba


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58482=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=8641]
{code:java}
Mar 22 04:12:50 04:12:50.260 [INFO] Running 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
Mar 22 04:12:50 04:12:50.609 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
Skipped: 0, Time elapsed: 0.318 s <<< FAILURE! -- in 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest
Mar 22 04:12:50 04:12:50.609 [ERROR] 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs
 -- Time elapsed: 0.303 s <<< ERROR!
Mar 22 04:12:50 java.net.BindException: Could not start rest endpoint on any 
port in port range 8081
Mar 22 04:12:50 at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:286)
Mar 22 04:12:50 at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpointTest.cleansUpExpiredExecutionGraphs(WebMonitorEndpointTest.java:69)
Mar 22 04:12:50 at java.lang.reflect.Method.invoke(Method.java:498)
Mar 22 04:12:50 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
Mar 22 04:12:50 at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Mar 22 04:12:50  {code}
This was noted as a symptom of FLINK-22980, but doesn't have the same failure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34918) Introduce the support of catalog for comments

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34918:


 Summary: Introduce the support of catalog for comments
 Key: FLINK-34918
 URL: https://issues.apache.org/jira/browse/FLINK-34918
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


We propose to introduce `getComment()` method in `CatalogDescriptor`, and the 
reasons are as follows.

1. For the sake of design consistency, follow the design of FLIP-295 [1] which 
introduced `CatalogStore` component, `CatalogDescriptor` includes names and 
attributes, both of which are used to describe the catalog, and `comment` can 
be added smoothly.

2. Extending the existing class rather than add new method to the existing 
interface, Especially, the `Catalog` interface, as a core interface, is used by 
a series of important components such as `CatalogFactory`, `CatalogManager` and 
`FactoryUtil`, and is implemented by a large number of connectors such as JDBC, 
Paimon, and Hive. Adding methods to it will greatly increase the implementation 
complexity, and more importantly, increase the cost of iteration, maintenance, 
and verification.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-22 Thread Feng Jin
Hi Di,

Thank you for the update, as well as quickly implementing corresponding
capabilities including filter push down and project push down.

Regarding the transaction timeout, I still have some doubts. I would like
to confirm if we can control this timeout parameter in the connector, such
as setting it to 10 minutes or 1 hour.
Also, when a transaction is cleared by the server, the commit operation of
the connector will fail, leading to job failure. In this case, can users
only choose to delete the checkpoint and re-consume historical data?

There is also a small question regarding the parameters*: *
*doris.request.connect.timeout.ms *
and d*oris.request.read.timeout.ms *,
can we change them to Duration type and remove the "ms" suffix.?
This way, all time parameters can be kept uniform in type as duration.


Best,
Feng

On Fri, Mar 22, 2024 at 4:46 PM wudi <676366...@qq.com.invalid> wrote:

> Hi, Feng,
> Thank you, that's a great suggestion !
>
> I have already implemented FilterPushDown and removed that parameter on
> DorisDynamicTableSource[1], and also updated FLIP.
>
> Regarding the mention of [Doris also aborts transactions], it may not have
> been described accurately. It mainly refers to the automatic expiration of
> long-running transactions in Doris that have not been committed for a
> prolonged period.
>
> As for two-phase commit, when a commit fails, the checkpoint will also
> fail, and the job will be continuously retried.
>
> [1]
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58
>
> Brs
> di.wu
>
>
> > 2024年3月15日 14:53,Feng Jin  写道:
> >
> > Hi Di
> >
> > Thank you for initiating this FLIP, +1 for this.
> >
> > Regarding the option `doris.filter.query` of doris source table
> >
> > Can we directly implement the FilterPushDown capability of Flink Source
> > like Jdbc Source [1] instead of introducing an option?
> >
> >
> > Regarding two-phase commit,
> >
> >> At the same time, Doris will also abort transactions that have not been
> > committed for a long time
> >
> > Can we control the transaction timeout in the connector?
> > And control the behavior when timeout occurs, whether to discard by
> default
> > or trigger job failure?
> >
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-16024
> >
> > Best,
> > Feng
> >
> >
> > On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky  >
> > wrote:
> >
> >> Hi,
> >>
> >> Thanks for driving this, +1 for the FLIP.
> >>
> >> Best,
> >> Ferenc
> >>
> >>
> >>
> >>
> >> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy  >
> >> wrote:
> >>
> >>>
> >>>
> >>> Hello,
> >>> Thanks for the proposal, +1 for the FLIP.
> >>>
> >>> Best Regards
> >>> Ahmed Hamdy
> >>>
> >>>
> >>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
> >>>
>  Hi, Leonard
>  Thank you for your suggestion.
>  I referred to other Connectors[1], modified the naming and types of
>  relevant parameters[2], and also updated FLIP.
> 
>  [1]
> 
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
>  [1]
> 
> >>
> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
> 
>  Brs,
>  di.wu
> 
> > 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
> >
> > Thanks wudi for the updating, the FLIP generally looks good to me, I
> > only left two minor suggestions:
> >
> > (1) The suffix `.s` in configoption doris.request.query.timeout.s
> >> looks
> > strange to me, could we change all time interval related option
> >> value type
> > to Duration ?
> >
> > (2) Could you check and improve all config options like
> > `doris.exec.mem.limit` to make them to follow flink config option
> >> naming
> > and value type?
> >
> > Best,
> > Leonard
> >
> >>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
> >>>
> >>> Hi Di,
> >>>
> >>> Thanks for your proposal. +1 for the contribution. I'd like to
> >> know
> >>> your
> >>> thoughts about the following questions:
> >>>
> >>> 1. According to your clarification of the exactly-once, thanks
> >> for it
> >>> BTW,
> >>> no PreCommitTopology is required. Does it make sense to let
> >>> DorisSink[1]
> >>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
> >>> deprecated[2] before turning the Doris connector into a Flink
> >>> connector?
> >>> 2. OLAP engines are commonly used as the tail/downstream of a
> >> data
> >>> pipeline
> >>> to support further e.g. ad-hoc query or cube with feasible
> >>> pre-aggregation.
> >>> Just out of curiosity, would you like to share some real use
> >> cases that
> >>> will use OLAP 

[jira] [Created] (FLINK-34917) Support enhanced `CREATE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34917:


 Summary: Support enhanced `CREATE CATALOG` syntax
 Key: FLINK-34917
 URL: https://issues.apache.org/jira/browse/FLINK-34917
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li
 Attachments: image-2024-03-22-18-31-59-632.png

{{IF NOT EXISTS}}  clause: If the catalog already exists, nothing happens.

{{COMMENT}} clause: An optional string literal. The description for the catalog.

NOTICE: we just need to introduce the '[IF NOT EXISTS]' and '[COMMENT]' clause 
to the 'create catalog' statement.

!image-2024-03-22-18-31-59-632.png|width=795,height=87!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34916) Support `ALTER CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34916:


 Summary: Support `ALTER CATALOG` syntax
 Key: FLINK-34916
 URL: https://issues.apache.org/jira/browse/FLINK-34916
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li
 Attachments: image-2024-03-22-18-30-33-182.png

Set one or more properties in the specified catalog. If a particular property 
is already set in the catalog, override the old value with the new one.

!image-2024-03-22-18-30-33-182.png|width=736,height=583!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34915) Introduce `DESCRIBE CATALOG` syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34915:


 Summary: Introduce `DESCRIBE CATALOG` syntax
 Key: FLINK-34915
 URL: https://issues.apache.org/jira/browse/FLINK-34915
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


Describe the metadata of an existing catalog. The metadata information includes 
the catalog’s name, type, and comment. If the optional {{EXTENDED}} option is 
specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and it 
is not actually available. we can complete the syntax in this FLIP. 
{{Flink SQL> describe catalog cat2;}}
{{+--+---+}}
{{| catalog_description_item | catalog_description_value |}}
{{+--+---+}}
{{| Name |  cat2 |}}
{{| Type | generic_in_memory |}}
{{|  Comment |   |}}
{{+--+---+}}
{{3 rows }}{{in}} {{set}}
 
{{Flink SQL> describe catalog extended cat2;}}
{{+--+-+}}
{{| catalog_description_item |   catalog_description_value 
|}}
{{+--+-+}}
{{| Name |    cat2 
|}}
{{| Type |   generic_in_memory 
|}}
{{|  Comment | 
|}}
{{{}|   Properties | (default-database,db), 
({}}}{{{}type{}}}{{{},generic_in_memory) |{}}}
{{+--+-+}}
{{4 rows }}{{in}} {{set}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34914) FLIP-436: Introduce Catalog-related Syntax

2024-03-22 Thread Yubin Li (Jira)
Yubin Li created FLINK-34914:


 Summary: FLIP-436: Introduce Catalog-related Syntax
 Key: FLINK-34914
 URL: https://issues.apache.org/jira/browse/FLINK-34914
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: Yubin Li


Umbrella issue for: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-436: Introduce Catalog-related Syntax

2024-03-22 Thread Yubin Li
Hi everyone,

I'm happy to announce that FLIP-436: Introduce Catalog-related Syntax
[1] has been accepted with 10 approving votes (5 binding) [2]:

- Yuepeng Pan (non-binding)
- Jark Wu (binding)
- Ferenc Csaky (non-binding)
- Feng Jin (non-binding)
- Lincoln Lee (binding)
- Leonard Xu (binding)
- Jane Chan (binding)
- Hang Ruan (non-binding)
- gongzhongqiang (non-binding)
- Benchao Li (binding)

There were no disapproving votes. Thanks to everyone who participated
in the discussion and voting!

Best,
Yubin

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-436%3A+Introduce+Catalog-related+Syntax
[2] https://lists.apache.org/thread/tkgg1lv9hg8s3p44256nh5pl48wfwmtf


RE: [DISCUSS] FLIP-XXX Apicurio-avro format

2024-03-22 Thread David Radley
Hi Jeyhun,
Thanks for your feedback.

So for outbound messages, the message includes the global ID. We register the 
schema and match on the artifact id. So if the schema then evolved, adding a 
new  version, the global ID would still be unique and the same version would be 
targeted. If you wanted to change the Flink table definition in line with a 
higher version, then you could do this – the artifact id would need to match 
for it to use the same schema and a higher artifact version would need to be 
provided. I notice that Apicurio has rules around compatibility that you can 
configure, I suppose if we attempt to create an artifact that breaks these 
rules , then the register schema will fail and the associated operation should 
fail (e.g. an insert). I have not tried this.


For inbound messages, using the global id in the header – this targets one 
version of the schema. I can create different messages on the topic built with 
different schema versions, and I can create different tables in Flink, as long 
as the reader and writer schemas are compatible as per the 
https://github.com/apache/flink/blob/779459168c46b7b4c600ef52f99a5435f81b9048/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L109
Then this should work.

Does this address your question?
Kind regards, David.


From: Jeyhun Karimov 
Date: Thursday, 21 March 2024 at 21:06
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Apicurio-avro format
Hi David,

Thanks for the FLIP. +1 for it.
I have a minor comment.

Can you please elaborate more on mechanisms in place to ensure data
consistency and integrity, particularly in the event of schema conflicts?
Since each message includes a schema ID for inbound and outbound messages,
can you elaborate more on message consistency in the context of schema
evolution?

Regards,
Jeyhun





On Wed, Mar 20, 2024 at 4:34 PM David Radley  wrote:

> Thank you very much for your feedback Mark. I have made the changes in the
> latest google document. On reflection I agree with you that the
> globalIdPlacement format configuration should apply to the deserialization
> as well, so it is declarative. I am also going to have a new configuration
> option to work with content IDs as well as global IDs. In line with the
> deser Apicurio IdHandler and headerHandlers.
>
>  kind regards, David.
>
>
> On 2024/03/20 15:18:37 Mark Nuttall wrote:
> > +1 to this
> >
> > A few small comments:
> >
> > Currently, if users have Avro schemas in an Apicurio Registry (an open
> source Apache 2 licensed schema registry), then the natural way to work
> with those Avro flows is to use the schemas in the Apicurio Repository.
> > 'those Avro flows' ... this is the first reference to flows.
> >
> > The new format will use the global Id to look up the Avro schema that
> the message was written during deserialization.
> > I get the point, phrasing is awkward. Probably you're more interested in
> content than word polish at this point though.
> >
> > The Avro Schema Registry (apicurio-avro) format
> > The Confluent format is called avro-confluent; this should be
> avro-apicurio
> >
> > How to create tables with Apicurio-avro format
> > s/Apicurio-avro/avro-apicurio/g
> >
> > HEADER – globalId is put in the header
> > LEGACY– global Id is put in the message as a long
> > CONFLUENT - globalId is put in the message as an int.
> > Please could we specify 'four-byte int' and 'eight-byte long' ?
> >
> > For a Kafka source the globalId will be looked for in this order:
> > - In the header
> > - After a magic byte as an int
> > - After a magic byte as a long.
> > but apicurio-avro.globalid-placement has a default value of HEADER : why
> do we have a search order as well? Isn't apicurio-avro.globalid-placement
> enough? Don't the two mechanisms conflict?
> >
> > In addition to the types listed there, Flink supports reading/writing
> nullable types. Flink maps nullable types to Avro union(something, null),
> where something is the Avro type converted from Flink type.
> > Is that definitely the right way round? I know we've had multiple
> conversations about how unions work with Flink
> >
> >  This is because the writer schema is expanded, but this could not
> complete if there are circularities.
> > I understand your meaning but the sentence is awkward.
> >
> > The registered schema will be created or if it exists be updated.
> > same again
> >
> > At some stage the lowest Flink level supported by the Kafka connector
> will contain the additionalProperties methods in code flink.
> > wording
> >
> > There existing Kafka deserialization for the writer schema passes down
> the message body to be deserialised.
> > wording
> >
> > @Override
> > public void deserialize(ConsumerRecord message,
> Collector out)
> >   throws IOException {
> >   Map additionalPropertiesMap =  new HashMap<>();
> >   for (Header header : message.additionalProperties()) {
> >

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-22 Thread Timo Walther

Hi Ron,

thanks for the detailed answer. Sorry, for my late reply, we had a 
conference that kept me busy.


> In the current concept[1], it actually includes: Dynamic Tables &
> & Continuous Query. Dynamic Table is just an abstract logical concept

This explanation makes sense to me. But the docs also say "A continuous 
query is evaluated on the dynamic table yielding a new dynamic table.". 
So even our regular CREATE TABLEs are considered dynamic tables. This 
can also be seen in the diagram "Dynamic Table -> Continuous Query -> 
Dynamic Table". Currently, Flink queries can only be executed on Dynamic 
Tables.


> In essence, a materialized view represents the result of a query.

Isn't that what your proposal does as well?

> the object of the suspend operation is the refresh task of the 
dynamic table


I understand that Snowflake uses the term [1] to merge their concepts of 
STREAM, TASK, and TABLE into one piece of concept. But Flink has no 
concept of a "refresh task". Also, they already introduced MATERIALIZED 
VIEW. Flink is in the convenient position that the concept of 
materialized views is not taken (reserved maybe for exactly this use 
case?). And SQL standard concept could be "slightly adapted" to our 
needs. Looking at other vendors like Postgres[2], they also use 
`REFRESH` commands so why not adding additional commands such as DELETE 
or UPDATE. Oracle supports  "ON PREBUILT TABLE clause tells the database 
to use an existing table segment"[3] which comes closer to what we want 
as well.


> it is not intended to support data modification

This is an argument that I understand. But we as Flink could allow data 
modifications. This way we are only extending the standard and don't 
introduce new concepts.


If we can't agree on using MATERIALIZED VIEW concept. We should fix our 
syntax in a Flink 2.0 effort. Making regular tables bounded and dynamic 
tables unbounded. We would be closer to the SQL standard with this and 
pave the way for the future. I would actually support this if all 
concepts play together nicely.


> In the future, we can consider extending the statement set syntax to 
support the creation of multiple dynamic tables.


It's good that we called the concept STATEMENT SET. This allows us to 
defined CREATE TABLE within. Even if it might look a bit confusing.


Regards,
Timo

[1] https://docs.snowflake.com/en/user-guide/dynamic-tables-about
[2] https://www.postgresql.org/docs/current/sql-creatematerializedview.html
[3] https://oracle-base.com/articles/misc/materialized-views

On 21.03.24 04:14, Feng Jin wrote:

Hi Ron and Lincoln

Thanks for driving this discussion.  I believe it will greatly improve the
convenience of managing user real-time pipelines.

I have some questions.

*Regarding Limitations of Dynamic Table:*


Does not support modifying the select statement after the dynamic table

is created.

Although currently we restrict users from modifying the query, I wonder if
we can provide a better way to help users rebuild it without affecting
downstream OLAP queries.


*Regarding the management of background jobs:*

1. From the documentation, the definitions SQL and job information are
stored in the Catalog. Does this mean that if a system needs to adapt to
Dynamic Tables, it also needs to store Flink's job information in the
corresponding system?
For example, does MySQL's Catalog need to store flink job information as
well?


2. Users still need to consider how much memory is being used, how large
the concurrency is, which type of state backend is being used, and may need
to set TTL expiration.


*Regarding the Refresh Part:*


If the refresh mode is continuous and a background job is running,

caution should be taken with the refresh command as it can lead to
inconsistent data.

When we submit a refresh command, can we help users detect if there are any
running jobs and automatically stop them before executing the refresh
command? Then wait for it to complete before restarting the background
streaming job?

Best,
Feng

On Tue, Mar 19, 2024 at 9:40 PM Lincoln Lee  wrote:


Hi Yun,

Thank you very much for your valuable input!

Incremental mode is indeed an attractive idea, we have also discussed
this, but in the current design,

we first provided two refresh modes: CONTINUOUS and
FULL. Incremental mode can be introduced

once the execution layer has the capability.

My answer for the two questions:

1.
Yes, cascading is a good question.  Current proposal provides a
freshness that defines a dynamic
table relative to the base table’s lag. If users need to consider the
end-to-end freshness of multiple
cascaded dynamic tables, he can manually split them for now. Of
course, how to let multiple cascaded
  or dependent dynamic tables complete the freshness definition in a
simpler way, I think it can be
extended in the future.

2.
Cascading refresh is also a part we focus on discussing. In this flip,
we hope to focus as much as
possible on the core features (as it already 

Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-22 Thread Muhammet Orazov

Congratulations! Thanks for the great effort!

Best,
Muhammet Orazov

On 2024-03-20 13:34, Leonard Xu wrote:

Hi devs and users,

We are thrilled to announce that the donation of Flink CDC as a 
sub-project of Apache Flink has completed. We invite you to explore the 
new resources available:


- GitHub Repository: https://github.com/apache/flink-cdc
- Flink CDC Documentation: 
https://nightlies.apache.org/flink/flink-cdc-docs-stable


After Flink community accepted this donation[1], we have completed 
software copyright signing, code repo migration, code cleanup, website 
migration, CI migration and github issues migration etc.
Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, 
Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other 
contributors for their contributions and help during this process!



For all previous contributors: The contribution process has slightly 
changed to align with the main Flink project. To report bugs or suggest 
new features, please open tickets
Apache Jira (https://issues.apache.org/jira).  Note that we will no 
longer accept GitHub issues for these purposes.



Welcome to explore the new repository and documentation. Your feedback 
and contributions are invaluable as we continue to improve Flink CDC.


Thanks everyone for your support and happy exploring Flink CDC!

Best,
Leonard
[1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob


Re: [DISCUSS] FLIP-428: Fault Tolerance/Rescale Integration for Disaggregated State

2024-03-22 Thread yue ma
Hi jinzhong,
Thanks for you reply. I still have some doubts about the first question. Is
there such a case
When you made a snapshot during the synchronization phase, you recorded the
current and manifest 8, but before asynchronous phase, the manifest reached
the size threshold and then the CURRENT FILE pointed to the new manifest 9,
and then uploaded the incorrect CURRENT file ?

Jinzhong Li  于2024年3月20日周三 20:13写道:

> Hi Yue,
>
> Thanks for your feedback!
>
> > 1. If we choose Option-3 for ForSt , how would we handle Manifest File
> > ? Should we take a snapshot of the Manifest during the synchronization
> phase?
>
> IIUC, the GetLiveFiles() API in Option-3 can also catch the fileInfo of
> Manifest files, and this api also return the manifest file size, which
> means this api could take snapshot for Manifest FileInfo (filename +
> fileSize) during the synchronization phase.
> You could refer to the rocksdb source code[1] to verify this.
>
>
>  > However, many distributed storage systems do not support the
> > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > directly read and write remote files. Can we not copy or Fast duplicate
> > these files, but instand of directly reuse and. reference these remote
> > files? I think this can reduce file download time and may be more useful
> > for most users who use HDFS (do not support Fast Duplicate)?
>
> Firstly, as far as I know, most remote file systems support the
> FastDuplicate, eg. S3 copyObject/Azure Blob Storage copyBlob/OSS
> copyObject, and the HDFS indeed does not support FastDuplicate.
>
> Actually,we have considered the design which reuses remote files. And that
> is what we want to implement in the coming future, where both checkpoints
> and restores can reuse existing files residing on the remote state storage.
> However, this design conflicts with the current file management system in
> Flink.  At present, remote state files are managed by the ForStDB
> (TaskManager side), while checkpoint files are managed by the JobManager,
> which is a major hindrance to file reuse. For example, issues could arise
> if a TM reuses a checkpoint file that is subsequently deleted by the JM.
> Therefore, as mentioned in FLIP-423[2], our roadmap is to first integrate
> checkpoint/restore mechanisms with existing framework  at milestone-1.
> Then, at milestone-2, we plan to introduce TM State Ownership and Faster
> Checkpointing mechanisms, which will allow both checkpointing and restoring
> to directly reuse remote files, thus achieving faster checkpointing and
> restoring.
>
> [1]
>
> https://github.com/facebook/rocksdb/blob/6ddfa5f06140c8d0726b561e16dc6894138bcfa0/db/db_filesnapshot.cc#L77
> [2]
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-RoadMap+LaunchingPlan
>
> Best,
> Jinzhong
>
>
>
>
>
>
>
> On Wed, Mar 20, 2024 at 4:01 PM yue ma  wrote:
>
> > Hi Jinzhong
> >
> > Thank you for initiating this FLIP.
> >
> > I have just some minor question:
> >
> > 1. If we choice Option-3 for ForSt , how would we handle Manifest File
> > ? Should we take snapshot of the Manifest during the synchronization
> phase?
> > Otherwise, may the Manifest and MetaInfo information be inconsistent
> during
> > recovery?
> > 2. For the Restore Operation , we need Fast Duplicate  Checkpoint Files
> to
> > Working Dir . However, many distributed storage systems do not support
> the
> > ability of Fast Duplicate (such as HDFS). But ForSt has the ability to
> > directly read and write remote files. Can we not copy or Fast duplicate
> > these files, but instand of directly reuse and. reference these remote
> > files? I think this can reduce file download time and may be more useful
> > for most users who use HDFS (do not support Fast Duplicate)?
> >
> > --
> > Best,
> > Yue
> >
>


-- 
Best,
Yue


Re: [DISCUSS] FLIP-399: Flink Connector Doris

2024-03-22 Thread wudi
Hi, Feng,
Thank you, that's a great suggestion ! 

I have already implemented FilterPushDown and removed that parameter on 
DorisDynamicTableSource[1], and also updated FLIP.

Regarding the mention of [Doris also aborts transactions], it may not have been 
described accurately. It mainly refers to the automatic expiration of 
long-running transactions in Doris that have not been committed for a prolonged 
period.

As for two-phase commit, when a commit fails, the checkpoint will also fail, 
and the job will be continuously retried.

[1] 
https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java#L58

Brs
di.wu


> 2024年3月15日 14:53,Feng Jin  写道:
> 
> Hi Di
> 
> Thank you for initiating this FLIP, +1 for this.
> 
> Regarding the option `doris.filter.query` of doris source table
> 
> Can we directly implement the FilterPushDown capability of Flink Source
> like Jdbc Source [1] instead of introducing an option?
> 
> 
> Regarding two-phase commit,
> 
>> At the same time, Doris will also abort transactions that have not been
> committed for a long time
> 
> Can we control the transaction timeout in the connector?
> And control the behavior when timeout occurs, whether to discard by default
> or trigger job failure?
> 
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-16024
> 
> Best,
> Feng
> 
> 
> On Tue, Mar 12, 2024 at 12:12 AM Ferenc Csaky 
> wrote:
> 
>> Hi,
>> 
>> Thanks for driving this, +1 for the FLIP.
>> 
>> Best,
>> Ferenc
>> 
>> 
>> 
>> 
>> On Monday, March 11th, 2024 at 15:17, Ahmed Hamdy 
>> wrote:
>> 
>>> 
>>> 
>>> Hello,
>>> Thanks for the proposal, +1 for the FLIP.
>>> 
>>> Best Regards
>>> Ahmed Hamdy
>>> 
>>> 
>>> On Mon, 11 Mar 2024 at 15:12, wudi 676366...@qq.com.invalid wrote:
>>> 
 Hi, Leonard
 Thank you for your suggestion.
 I referred to other Connectors[1], modified the naming and types of
 relevant parameters[2], and also updated FLIP.
 
 [1]
 
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/overview/
 [1]
 
>> https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
 Brs,
 di.wu
 
> 2024年3月7日 14:33,Leonard Xu xbjt...@gmail.com 写道:
> 
> Thanks wudi for the updating, the FLIP generally looks good to me, I
> only left two minor suggestions:
> 
> (1) The suffix `.s` in configoption doris.request.query.timeout.s
>> looks
> strange to me, could we change all time interval related option
>> value type
> to Duration ?
> 
> (2) Could you check and improve all config options like
> `doris.exec.mem.limit` to make them to follow flink config option
>> naming
> and value type?
> 
> Best,
> Leonard
> 
>>> 2024年3月6日 06:12,Jing Ge j...@ververica.com.INVALID 写道:
>>> 
>>> Hi Di,
>>> 
>>> Thanks for your proposal. +1 for the contribution. I'd like to
>> know
>>> your
>>> thoughts about the following questions:
>>> 
>>> 1. According to your clarification of the exactly-once, thanks
>> for it
>>> BTW,
>>> no PreCommitTopology is required. Does it make sense to let
>>> DorisSink[1]
>>> implement SupportsCommitter, since the TwoPhaseCommittingSink is
>>> deprecated[2] before turning the Doris connector into a Flink
>>> connector?
>>> 2. OLAP engines are commonly used as the tail/downstream of a
>> data
>>> pipeline
>>> to support further e.g. ad-hoc query or cube with feasible
>>> pre-aggregation.
>>> Just out of curiosity, would you like to share some real use
>> cases that
>>> will use OLAP engines as the source of a streaming data
>> pipeline? Or it
>>> will only be used as the source for the batch?
>>> 3. The E2E test only covered sink[3], if I am not mistaken.
>> Would you
>>> like
>>> to test the source in E2E too?
>>> 
>>> [1]
 
 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java#L55
 
>>> [2]
 
 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Enhance+and+synchronize+Sink+API+to+match+the+Source+API
 
>>> [3]
 
 
>> https://github.com/apache/doris-flink-connector/blob/43e0e5cf9b832854ea228fb093077872e3a311b6/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java#L96
 
>>> Best regards,
>>> Jing
>>> 
>>> On Tue, Mar 5, 2024 at 11:18 AM wudi 676366...@qq.com.invalid
>> wrote:
>>> 
 Hi, Jeyhun Karimov.
 Thanks for your question.
 
 - How to ensure Exactly-Once?
 1. When the Checkpoint Barrier arrives, DorisSink will trigger
>> the
 precommit api of StreamLoad to complete 

[jira] [Created] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-34913:
--

 Summary: ConcurrentModificationException 
SubTaskInitializationMetricsBuilder.addDurationMetric
 Key: FLINK-34913
 URL: https://issues.apache.org/jira/browse/FLINK-34913
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Piotr Nowojski
 Fix For: 1.19.1


The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[RESULT][VOTE] FLIP-402: Extend ZooKeeper Curator configurations

2024-03-22 Thread Alex Nitavsky
Hi devs,

I'm happy to announce that FLIP-402: Extend ZooKeeper Curator configurations
 In Flink[1] has been accepted with 5 approving votes (3 binding)
[2]:

 - Matthias Pohl (binding)
 - Yang Wang (binding)
 - Ferenc Csaky (non-binding)
 - Martijn Visser (binding)
 - Zhongqiang Gong (non-binding)

There're no disapproving votes.

Thanks again to everyone who participated in the discussion and voting.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-402%3A+Extend+ZooKeeper+Curator+configurations
[2] https://lists.apache.org/thread/0bobx7y14b17j1rcrccodg990wngoboj


[jira] [Created] (FLINK-34912) change shade pattern from com.ververica.cdc.connectors.shaded to org.apache.flink.cdc.connectors.shaded

2024-03-22 Thread Xiao Huang (Jira)
Xiao Huang created FLINK-34912:
--

 Summary: change shade pattern from 
com.ververica.cdc.connectors.shaded to org.apache.flink.cdc.connectors.shaded
 Key: FLINK-34912
 URL: https://issues.apache.org/jira/browse/FLINK-34912
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.1.0
Reporter: Xiao Huang
 Fix For: 3.1.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-402: Extend ZooKeeper Curator configurations

2024-03-22 Thread Alex Nitavsky
Thank you all!

Closing the vote. The result will be sent in a separate email.

On Thu, Mar 21, 2024 at 11:01 AM gongzhongqiang 
wrote:

> +1 (non-binding)
>
>
> Best,
> Zhongqiang Gong
>
> Alex Nitavsky  于2024年3月7日周四 23:09写道:
>
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-402 [1]. It introduces new configuration
> > options for Apache Flink's ZooKeeper integration for high availability by
> > reflecting existing Apache Curator configuration options. It has been
> > discussed in this thread [2].
> >
> > I would like to start a vote.  The vote will be open for at least 72
> hours
> > (until March 10th 18:00 GMT) unless there is an objection or
> > insufficient votes.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-402%3A+Extend+ZooKeeper+Curator+configurations
> > [2] https://lists.apache.org/thread/gqgs2jlq6bmg211gqtgdn8q5hp5v9l1z
> >
> > Thanks
> > Alex
> >
>


Re: Flink Kubernetes Operator Failing Over FlinkDeployments to a New Cluster

2024-03-22 Thread Gyula Fóra
I agree, we would need some FLIPs to cover this. Actually there is already
some work on this topic initiated by Matthias Pohl (ccd).
Please see this:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore

This FLIP actually covers some of these limitations already and other
outstanding issues in the operator.

Cheers,
Gyula


Re: [DISCUSS] Planning Flink 1.20

2024-03-22 Thread Xintong Song
+1 for the proposed timeline and Weijie & Rui as the release managers.

I think it would be welcomed if another 1-2 volunteers join as the release
managers, but that's not a must. We used to have only 1-2 release managers
for each release,

Best,

Xintong



On Fri, Mar 22, 2024 at 2:55 PM Jark Wu  wrote:

> Thanks for kicking this off.
>
> +1 for the volunteered release managers (Weijie Guo, Rui Fan) and the
> targeting date (feature freeze: June 15).
>
> Best,
> Jark
>
>
>
>
>
> On Fri, 22 Mar 2024 at 14:00, Rui Fan <1996fan...@gmail.com> wrote:
>
> > Thanks Leonard for this feedback and help!
> >
> > Best,
> > Rui
> >
> > On Fri, Mar 22, 2024 at 12:36 PM weijie guo 
> > wrote:
> >
> > > Thanks Leonard!
> > >
> > > > I'd like to help you if you need some help like permissions from PMC
> > > side, please feel free to ping me.
> > >
> > > Nice to know. It'll help a lot!
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Leonard Xu  于2024年3月22日周五 12:09写道:
> > >
> > >> +1 for the proposed release managers (Weijie Guo, Rui Fan), both the
> two
> > >> candidates are pretty active committers thus I believe they know the
> > >> community development process well. The recent releases have four
> > release
> > >> managers, and I am also looking forward to having other volunteers
> > >>  join the management of Flink 1.20.
> > >>
> > >> +1 for targeting date (feature freeze: June 15, 2024), referring to
> the
> > >> release cycle of recent versions, release cycle of 4 months makes
> sense
> > to
> > >> me.
> > >>
> > >>
> > >> I'd like to help you if you need some help like permissions from PMC
> > >> side, please feel free to ping me.
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >> > 2024年3月19日 下午5:35,Rui Fan <1996fan...@gmail.com> 写道:
> > >> >
> > >> > Hi Weijie,
> > >> >
> > >> > Thanks for kicking off 1.20! I'd like to join you and participate in
> > the
> > >> > 1.20 release.
> > >> >
> > >> > Best,
> > >> > Rui
> > >> >
> > >> > On Tue, Mar 19, 2024 at 5:30 PM weijie guo <
> guoweijieres...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> Hi everyone,
> > >> >>
> > >> >> With the release announcement of Flink 1.19, it's a good time to
> kick
> > >> off
> > >> >> discussion of the next release 1.20.
> > >> >>
> > >> >>
> > >> >> - Release managers
> > >> >>
> > >> >>
> > >> >> I'd like to volunteer as one of the release managers this time. It
> > has
> > >> been
> > >> >> good practice to have a team of release managers from different
> > >> >> backgrounds, so please raise you hand if you'd like to volunteer
> and
> > >> get
> > >> >> involved.
> > >> >>
> > >> >>
> > >> >>
> > >> >> - Timeline
> > >> >>
> > >> >>
> > >> >> Flink 1.19 has been released. With a target release cycle of 4
> > months,
> > >> >> we propose a feature freeze date of *June 15, 2024*.
> > >> >>
> > >> >>
> > >> >>
> > >> >> - Collecting features
> > >> >>
> > >> >>
> > >> >> As usual, we've created a wiki page[1] for collecting new features
> in
> > >> 1.20.
> > >> >>
> > >> >>
> > >> >> In addition, we already have a number of FLIPs that have been voted
> > or
> > >> are
> > >> >> in the process, including pre-works for version 2.0.
> > >> >>
> > >> >>
> > >> >> In the meantime, the release management team will be finalized in
> the
> > >> next
> > >> >> few days, and we'll continue to create Jira Boards and Sync
> meetings
> > >> >> to make it easy
> > >> >> for everyone to get an overview and track progress.
> > >> >>
> > >> >>
> > >> >>
> > >> >> Best regards,
> > >> >>
> > >> >> Weijie
> > >> >>
> > >> >>
> > >> >>
> > >> >> [1] https://cwiki.apache.org/confluence/display/FLINK/1.20+Release
> > >> >>
> > >>
> > >>
> >
>


Re: [DISCUSS] Planning Flink 1.20

2024-03-22 Thread Jark Wu
Thanks for kicking this off.

+1 for the volunteered release managers (Weijie Guo, Rui Fan) and the
targeting date (feature freeze: June 15).

Best,
Jark





On Fri, 22 Mar 2024 at 14:00, Rui Fan <1996fan...@gmail.com> wrote:

> Thanks Leonard for this feedback and help!
>
> Best,
> Rui
>
> On Fri, Mar 22, 2024 at 12:36 PM weijie guo 
> wrote:
>
> > Thanks Leonard!
> >
> > > I'd like to help you if you need some help like permissions from PMC
> > side, please feel free to ping me.
> >
> > Nice to know. It'll help a lot!
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Leonard Xu  于2024年3月22日周五 12:09写道:
> >
> >> +1 for the proposed release managers (Weijie Guo, Rui Fan), both the two
> >> candidates are pretty active committers thus I believe they know the
> >> community development process well. The recent releases have four
> release
> >> managers, and I am also looking forward to having other volunteers
> >>  join the management of Flink 1.20.
> >>
> >> +1 for targeting date (feature freeze: June 15, 2024), referring to the
> >> release cycle of recent versions, release cycle of 4 months makes sense
> to
> >> me.
> >>
> >>
> >> I'd like to help you if you need some help like permissions from PMC
> >> side, please feel free to ping me.
> >>
> >> Best,
> >> Leonard
> >>
> >>
> >> > 2024年3月19日 下午5:35,Rui Fan <1996fan...@gmail.com> 写道:
> >> >
> >> > Hi Weijie,
> >> >
> >> > Thanks for kicking off 1.20! I'd like to join you and participate in
> the
> >> > 1.20 release.
> >> >
> >> > Best,
> >> > Rui
> >> >
> >> > On Tue, Mar 19, 2024 at 5:30 PM weijie guo  >
> >> > wrote:
> >> >
> >> >> Hi everyone,
> >> >>
> >> >> With the release announcement of Flink 1.19, it's a good time to kick
> >> off
> >> >> discussion of the next release 1.20.
> >> >>
> >> >>
> >> >> - Release managers
> >> >>
> >> >>
> >> >> I'd like to volunteer as one of the release managers this time. It
> has
> >> been
> >> >> good practice to have a team of release managers from different
> >> >> backgrounds, so please raise you hand if you'd like to volunteer and
> >> get
> >> >> involved.
> >> >>
> >> >>
> >> >>
> >> >> - Timeline
> >> >>
> >> >>
> >> >> Flink 1.19 has been released. With a target release cycle of 4
> months,
> >> >> we propose a feature freeze date of *June 15, 2024*.
> >> >>
> >> >>
> >> >>
> >> >> - Collecting features
> >> >>
> >> >>
> >> >> As usual, we've created a wiki page[1] for collecting new features in
> >> 1.20.
> >> >>
> >> >>
> >> >> In addition, we already have a number of FLIPs that have been voted
> or
> >> are
> >> >> in the process, including pre-works for version 2.0.
> >> >>
> >> >>
> >> >> In the meantime, the release management team will be finalized in the
> >> next
> >> >> few days, and we'll continue to create Jira Boards and Sync meetings
> >> >> to make it easy
> >> >> for everyone to get an overview and track progress.
> >> >>
> >> >>
> >> >>
> >> >> Best regards,
> >> >>
> >> >> Weijie
> >> >>
> >> >>
> >> >>
> >> >> [1] https://cwiki.apache.org/confluence/display/FLINK/1.20+Release
> >> >>
> >>
> >>
>


Re: [DISCUSS] Planning Flink 1.20

2024-03-22 Thread Rui Fan
Thanks Leonard for this feedback and help!

Best,
Rui

On Fri, Mar 22, 2024 at 12:36 PM weijie guo 
wrote:

> Thanks Leonard!
>
> > I'd like to help you if you need some help like permissions from PMC
> side, please feel free to ping me.
>
> Nice to know. It'll help a lot!
>
> Best regards,
>
> Weijie
>
>
> Leonard Xu  于2024年3月22日周五 12:09写道:
>
>> +1 for the proposed release managers (Weijie Guo, Rui Fan), both the two
>> candidates are pretty active committers thus I believe they know the
>> community development process well. The recent releases have four release
>> managers, and I am also looking forward to having other volunteers
>>  join the management of Flink 1.20.
>>
>> +1 for targeting date (feature freeze: June 15, 2024), referring to the
>> release cycle of recent versions, release cycle of 4 months makes sense to
>> me.
>>
>>
>> I'd like to help you if you need some help like permissions from PMC
>> side, please feel free to ping me.
>>
>> Best,
>> Leonard
>>
>>
>> > 2024年3月19日 下午5:35,Rui Fan <1996fan...@gmail.com> 写道:
>> >
>> > Hi Weijie,
>> >
>> > Thanks for kicking off 1.20! I'd like to join you and participate in the
>> > 1.20 release.
>> >
>> > Best,
>> > Rui
>> >
>> > On Tue, Mar 19, 2024 at 5:30 PM weijie guo 
>> > wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> With the release announcement of Flink 1.19, it's a good time to kick
>> off
>> >> discussion of the next release 1.20.
>> >>
>> >>
>> >> - Release managers
>> >>
>> >>
>> >> I'd like to volunteer as one of the release managers this time. It has
>> been
>> >> good practice to have a team of release managers from different
>> >> backgrounds, so please raise you hand if you'd like to volunteer and
>> get
>> >> involved.
>> >>
>> >>
>> >>
>> >> - Timeline
>> >>
>> >>
>> >> Flink 1.19 has been released. With a target release cycle of 4 months,
>> >> we propose a feature freeze date of *June 15, 2024*.
>> >>
>> >>
>> >>
>> >> - Collecting features
>> >>
>> >>
>> >> As usual, we've created a wiki page[1] for collecting new features in
>> 1.20.
>> >>
>> >>
>> >> In addition, we already have a number of FLIPs that have been voted or
>> are
>> >> in the process, including pre-works for version 2.0.
>> >>
>> >>
>> >> In the meantime, the release management team will be finalized in the
>> next
>> >> few days, and we'll continue to create Jira Boards and Sync meetings
>> >> to make it easy
>> >> for everyone to get an overview and track progress.
>> >>
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Weijie
>> >>
>> >>
>> >>
>> >> [1] https://cwiki.apache.org/confluence/display/FLINK/1.20+Release
>> >>
>>
>>