Re: [DISCUSS] FLINK-34440 Support Debezium Protobuf Confluent Format

2024-03-13 Thread Anupam Aggarwal
Hi Kevin,

Thanks, these are some great points.
Just to clarify, I do agree that the subject should be an option (like in
the case of RegistryAvroFormatFactory).
We could fallback to subject and auto-register schemas, if schema-Id not
provided explicitly.
In general, I think it would be good to be more explicit about the schemas
used (
https://docs.confluent.io/platform/curren/schema-registry/schema_registry_onprem_tutorial.html#auto-schema-registration

).
This would also help prevent us from overriding the ids in incompatible
ways.

Under the current implementation of FlinkToProtoSchemaConverter we might
end up overwriting the field-Ids.
If we are able to locate a prior schema, the approach you outlined makes a
lot of sense.
Let me explore this a bit further and get back(in terms of feasibility).

Thanks again!
- Anupam

On Wed, Mar 13, 2024 at 2:28 AM Kevin Lam 
wrote:

> Hi Anupam,
>
> Thanks again for your work on contributing this feature back.
>
> Sounds good re: the refactoring/re-organizing.
>
> Regarding the schema-id, in my opinion this should NOT be a configuration
> option on the format. We should be able to deterministically map the Flink
> type to the ProtoSchema and register that with the Schema Registry.
>
> I think it can make sense to provide the `subject` as a parameter, so that
> the serialization format can look up existing schemas.
>
> This would be used in something I mentioned something related above
>
> > Another topic I had is Protobuf's field ids. Ideally in Flink it would be
> > nice if we are idiomatic about not renumbering them in incompatible ways,
> > similar to what's discussed on the Schema Registry issue here:
> > https://github.com/confluentinc/schema-registry/issues/2551
>
>
> When we construct the ProtobufSchema from the Flink LogicalType, we
> shouldn't renumber the field ids in an incompatible way, so one approach
> would be to use the subject to look up the most recent version, and use
> that to evolve the field ids correctly.
>
>
> On Tue, Mar 12, 2024 at 2:33 AM Anupam Aggarwal  >
> wrote:
>
> > Hi Kevin,
> >
> > Thanks for starting the discussion on this.
> > I will be working on contributing this feature back (This was developed
> by
> > Dawid Wysakowicz and others at Confluent).
> >
> > I have opened a (very initial) draft PR here
> > https://github.com/apache/flink/pull/24482 with our current
> > implementation.
> > Thanks for the feedback on the PR, I haven’t gotten around to
> > re-organizing/refactoring the classes yet, but it would be inline with
> some
> > of your comments.
> >
> > On the overall approach there are some slight variations from the initial
> > proposal.
> > Our implementation relies on an explicit schema-id being passed through
> the
> > config. This could help in cases where one Flink type could potentially
> map
> > to multiple proto types.
> > We could make the schema-Id optional and fall back to deriving it from
> the
> > rowType (during serialization) if not present?
> >
> > The message index handling is still TBD. I am thinking of replicating
> logic
> > in AbstractKafkaProtobufSerializer.java
> > <
> >
> https://github.com/confluentinc/schema-registry/blob/342c8a9d3854d4253d785214f5dcfb1b6cc59a06/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufSerializer.java#L157
> > >
> >  (|Deserializer).
> > Please let me know if this makes sense / or in case you have any other
> > feedback.
> >
> > Thanks
> > Anupam
> >
> > On Thu, Feb 29, 2024 at 8:54 PM Kevin Lam  >
> > wrote:
> >
> > > Hey Robert,
> > >
> > > Awesome thanks, that timeline works for me. Sounds good re: deciding on
> > > FLIP once we have the PR, and thanks for looking into the field ids.
> > >
> > > Looking forward to it!
> > >
> > > On Thu, Feb 29, 2024 at 5:09 AM Robert Metzger 
> > > wrote:
> > >
> > > > Hey Kevin,
> > > >
> > > > Thanks a lot. Then let's contribute the Confluent implementation to
> > > > apache/flink. We can't start working on this immediately because of a
> > > team
> > > > event next week, but within the next two weeks, we will start working
> > on
> > > > this.
> > > > It probably makes sense for us to open a pull request of what we have
> > > > already, so that you can start reviewing and maybe also contributing
> to
> > > the
> > > > PR.
> > > > I hope this timeline works for you!
> > > >
> > > > Let's also decide if we need a FLIP once the code is public.
> > > > We will look into the field ids.
> > > >
> > > >
> > > > On Tue, Feb 27, 2024 at 8:56 PM Kevin Lam
> >  > > >
> > > > wrote:
> > > >
> > > > > Hey Robert,
> > > > >
> > > > > Thanks for your response. I have a partial implementation, just for
> > the
> > > > > decoding portion.
> > > > >
> > > > > The code I have is pretty rough and doesn't do any of the
> refactors I
> > > > > mentioned, but the decoder logic does pull the 

Re: [VOTE] Release 1.19.0, release candidate #2

2024-03-13 Thread Yun Tang
+1 (non-binding)


  *
Verified the signature and checksum.
  *
Reviewed the release note PR
  *
Reviewed the web announcement PR
  *
Start a standalone cluster to submit the state machine example, which works 
well.
  *
Checked the pre-built jars are generated via JDK8
  *
Verified the process profiler works well after setting rest.profiling.enabled: 
true

Best
Yun Tang


From: Qingsheng Ren 
Sent: Wednesday, March 13, 2024 12:45
To: dev@flink.apache.org 
Subject: Re: [VOTE] Release 1.19.0, release candidate #2

+1 (binding)

- Verified signature and checksum
- Verified no binary in source
- Built from source
- Tested reading and writing Kafka with SQL client and Kafka connector 3.1.0
- Verified source code tag
- Reviewed release note
- Reviewed web PR

Thanks to all release managers and contributors for the awesome work!

Best,
Qingsheng

On Wed, Mar 13, 2024 at 1:23 AM Matthias Pohl
 wrote:

> I want to share an update on FLINK-34227 [1]: It's still not clear what's
> causing the test instability. So far, we agreed in today's release sync [2]
> that it's not considered a blocker because it is observed in 1.18 nightly
> builds and it only appears in the GitHub Actions workflow. But I still have
> a bit of a concern that this is something that was introduced in 1.19 and
> backported to 1.18 after the 1.18.1 release (because the test instability
> started to appear more regularly in March; with one occurrence in January).
> Additionally, I have no reason to believe, yet, that the instability is
> caused by some GHA-related infrastructure issue.
>
> So, if someone else has some capacity to help looking into it; that would
> be appreciated. I will continue my investigation tomorrow.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-34227
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/1.19+Release#id-1.19Release-03/12/2024
>
> On Tue, Mar 12, 2024 at 12:50 PM Benchao Li  wrote:
>
> > +1 (non-binding)
> >
> > - checked signature and checksum: OK
> > - checkout copyright year in notice file: OK
> > - diffed source distribution with tag, make sure there is no
> > unexpected files: OK
> > - build from source : OK
> > - start a local cluster, played with jdbc connector: OK
> >
> > weijie guo  于2024年3月12日周二 16:55写道:
> > >
> > > +1 (non-binding)
> > >
> > > - Verified signature and checksum
> > > - Verified source distribution does not contains binaries
> > > - Build from source code and submit a word-count job successfully
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Jane Chan  于2024年3月12日周二 16:38写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > > - Verify that the source distributions do not contain any binaries;
> > > > - Build the source distribution to ensure all source files have
> Apache
> > > > headers;
> > > > - Verify checksum and GPG signatures;
> > > >
> > > > Best,
> > > > Jane
> > > >
> > > > On Tue, Mar 12, 2024 at 4:08 PM Xuannan Su 
> > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - Verified signature and checksum
> > > > > - Verified that source distribution does not contain binaries
> > > > > - Built from source code successfully
> > > > > - Reviewed the release announcement PR
> > > > >
> > > > > Best regards,
> > > > > Xuannan
> > > > >
> > > > > On Tue, Mar 12, 2024 at 2:18 PM Hang Ruan 
> > > > wrote:
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > - Verified signatures and checksums
> > > > > > - Verified that source does not contain binaries
> > > > > > - Build source code successfully
> > > > > > - Reviewed the release note and left a comment
> > > > > >
> > > > > > Best,
> > > > > > Hang
> > > > > >
> > > > > > Feng Jin  于2024年3月12日周二 11:23写道:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > - Verified signatures and checksums
> > > > > > > - Verified that source does not contain binaries
> > > > > > > - Build source code successfully
> > > > > > > - Run a simple sql query successfully
> > > > > > >
> > > > > > > Best,
> > > > > > > Feng Jin
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Mar 12, 2024 at 11:09 AM Ron liu 
> > wrote:
> > > > > > >
> > > > > > > > +1 (non binding)
> > > > > > > >
> > > > > > > > quickly verified:
> > > > > > > > - verified that source distribution does not contain binaries
> > > > > > > > - verified checksums
> > > > > > > > - built source code successfully
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Ron
> > > > > > > >
> > > > > > > > Jeyhun Karimov  于2024年3月12日周二 01:00写道:
> > > > > > > >
> > > > > > > > > +1 (non binding)
> > > > > > > > >
> > > > > > > > > - verified that source distribution does not contain
> binaries
> > > > > > > > > - verified signatures and checksums
> > > > > > > > > - built source code successfully
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Jeyhun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Mar 11, 2024 at 3:08 PM Samrat 

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-13 Thread Jark Wu
Hi Mingge, Hao,

Thanks for your replies.

> PTF is actually the ideal approach for model functions, and we do have
the plans to use PTF for
all model functions (including prediction, evaluation etc..) once the PTF
is supported in FlinkSQL
confluent extension.

It sounds that PTF is the ideal way and table function is a temporary
solution which will be dropped in the future.
I'm not sure whether we can implement it using PTF in Flink SQL. But we
have implemented window
functions using PTF[1]. And introduced a new window function (called
CUMULATE[2]) in Flink SQL based
on this. I think it might work to use PTF and implement model function
syntax like this:

SELECT * FROM TABLE(ML_PREDICT(
  TABLE my_table,
  my_model,
  col1,
  col2
));

Besides, did you consider following the way of AWS Redshift which defines
model function with the model itself together?
IIUC, a model is a black-box which defines input parameters and output
parameters which can be modeled into functions.


Best,
Jark

[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#session
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-CumulatingWindows
[3]:
https://github.com/aws-samples/amazon-redshift-ml-getting-started/blob/main/use-cases/bring-your-own-model-remote-inference/README.md#create-model




On Wed, 13 Mar 2024 at 15:00, Hao Li  wrote:

> Hi Jark,
>
> Thanks for your questions. These are good questions!
>
> 1. The polymorphism table function I was referring to takes a table as
> input and outputs a table. So the syntax would be like
> ```
> SELECT * FROM ML_PREDICT('model', (SELECT * FROM my_table))
> ```
> As far as I know, this is not supported yet on Flink. So before it's
> supported, one option for the predict function is using table function
> which can output multiple columns
> ```
> SELECT * FROM my_table, LATERAL VIEW (ML_PREDICT('model', col1, col2))
> ```
>
> 2. Good question. Type inference is hard for the `ML_PREDICT` function
> because it takes a model name string as input. I can think of three ways of
> doing type inference for it.
>1). Treat `ML_PREDICT` function as something special and during sql
> parsing or planning time, if it's encountered, we need to look up the model
> from the first argument which is a model name from catalog. Then we can
> infer the input/output for the function.
>2). We can define a `model` keyword and use that in the predict function
> to indicate the argument refers to a model. So it's like `ML_PREDICT(model
> 'my_model', col1, col2))`
>3). We can create a special type of table function maybe called
> `ModelFunction` which can resolve the model type inference by special
> handling it during parsing or planning time.
> 1) is hacky, 2) isn't supported in Flink for function, 3) might be a
> good option.
>
> 3. I sketched the `ML_PREDICT` function for inference. But there are
> limitations of the function mentioned in 1 and 2. So maybe we don't need to
> introduce them as built-in functions until polymorphism table function and
> we can properly deal with type inference.
> After that, defining a user-defined model function should also be
> straightforward.
>
> 4. For model types, do you mean 'remote', 'import', 'native' models or
> other things?
>
> 5. We could support popular providers such as 'azureml', 'vertexai',
> 'googleai' as long as we support the `ML_PREDICT` function. Users should be
> able to implement 3rd-party providers if they can implement a function
> handling the input/output for the provider.
>
> I think for the model functions, there are still dependencies or hacks we
> need to sort out as a built-in function. Maybe we can separate that as a
> follow up if we want to have it built-in and focus on the model syntax for
> this FLIP?
>
> Thanks,
> Hao
>
> On Tue, Mar 12, 2024 at 10:33 PM Jark Wu  wrote:
>
> > Hi Minge, Chris, Hao,
> >
> > Thanks for proposing this interesting idea. I think this is a nice step
> > towards
> > the AI world for Apache Flink. I don't know much about AI/ML, so I may
> have
> > some stupid questions.
> >
> > 1. Could you tell more about why polymorphism table function (PTF)
> doesn't
> > work and do we have plan to use PTF as model functions?
> >
> > 2. What kind of object does the model map to in SQL? A relation or a data
> > type?
> > It looks like a data type because we use it as a parameter of the table
> > function.
> > If it is a data type, how does it cooperate with type inference[1]?
> >
> > 3. What built-in model functions will we support? How to define a
> > user-defined model function?
> >
> > 4. What built-in model types will we support? How to define a
> user-defined
> > model type?
> >
> > 5. Regarding the remote model, what providers will we support? Can users
> > implement
> > 3rd-party providers except OpenAI?
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> 

Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Hang Ruan
Hi, Yubin.

Thanks for the FLIP. +1 for it.

Best,
Hang

Yubin Li  于2024年3月14日周四 10:15写道:

> Hi Jingsong, Feng, and Jeyhun
>
> Thanks for your support and feedback!
>
> > However, could we add a new method `getCatalogDescriptor()` to
> > CatalogManager instead of directly exposing CatalogStore?
>
> Good point, Besides the audit tracking issue, The proposed feature
> only requires `getCatalogDescriptor()` function. Exposing components
> with excessive functionality will bring unnecessary risks, I have made
> modifications in the FLIP doc [1]. Thank Feng :)
>
> > Showing the SQL parser implementation in the FLIP for the SQL syntax
> > might be a bit confusing. Also, the formal definition is missing for
> > this SQL clause.
>
> Thank Jeyhun for pointing it out :) I have updated the doc [1] .
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
>
> Best,
> Yubin
>
>
> On Thu, Mar 14, 2024 at 2:18 AM Jeyhun Karimov 
> wrote:
> >
> > Hi Yubin,
> >
> > Thanks for the proposal. +1 for it.
> > I have one comment:
> >
> > I would like to see the SQL syntax for the proposed statement.  Showing
> the
> > SQL parser implementation in the FLIP
> > for the SQL syntax might be a bit confusing. Also, the formal definition
> is
> > missing for this SQL clause.
> > Maybe something like [1] might be useful. WDYT?
> >
> > Regards,
> > Jeyhun
> >
> > [1]
> >
> https://github.com/apache/flink/blob/0da60ca1a4754f858cf7c52dd4f0c97ae0e1b0cb/docs/content/docs/dev/table/sql/show.md?plain=1#L620-L632
> >
> > On Wed, Mar 13, 2024 at 3:28 PM Feng Jin  wrote:
> >
> > > Hi Yubin
> > >
> > > Thank you for initiating this FLIP.
> > >
> > > I have just one minor question:
> > >
> > > I noticed that we added a new function `getCatalogStore` to expose
> > > CatalogStore, and it seems fine.
> > > However, could we add a new method `getCatalogDescriptor()` to
> > > CatalogManager instead of directly exposing CatalogStore?
> > > By only providing the `getCatalogDescriptor()` interface, it may be
> easier
> > > for us to implement audit tracking in CatalogManager in the future.
> WDYT ?
> > > Although we have only collected some modified events at the moment.[1]
> > >
> > >
> > > [1].
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> > >
> > > Best,
> > > Feng
> > >
> > > On Wed, Mar 13, 2024 at 5:31 PM Jingsong Li 
> > > wrote:
> > >
> > > > +1 for this.
> > > >
> > > > We are missing a series of catalog related syntaxes.
> > > > Especially after the introduction of catalog store. [1]
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
> > > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion about FLIP-436: Introduce "SHOW
> CREATE
> > > > > CATALOG" Syntax [1].
> > > > >
> > > > > At present, the `SHOW CREATE TABLE` statement provides strong
> support
> > > for
> > > > > users to easily
> > > > > reuse created tables. However, despite the increasing importance
> of the
> > > > > `Catalog` in user's
> > > > > business, there is no similar statement for users to use.
> > > > >
> > > > > According to the online discussion in FLINK-24939 [2] with Jark Wu
> and
> > > > Feng
> > > > > Jin, since `CatalogStore`
> > > > > has been introduced in FLIP-295 [3], we could use this component to
> > > > > implement such a long-awaited
> > > > > feature, Please refer to the document [1] for implementation
> details.
> > > > >
> > > > > examples as follows:
> > > > >
> > > > > Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > > > > > 'default-database'='db');
> > > > > > [INFO] Execute statement succeeded.
> > > > > > Flink SQL> show create catalog cat2;
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > | result |
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > | CREATE CATALOG `cat2` WITH (
> > > > > >   'default-database' = 'db',
> > > > > >   'type' = 'generic_in_memory'
> > > > > > )
> > > > > >  |
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > 1 row in set
> > > > >
> > > > >
> > > > >
> > > > > Looking forward to hearing from you, thanks!
> > > > >
> > > > > Best regards,
> > > > > Yubin
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-24939
> > > > > [3]
> > > > >
> > > >
> > >
> 

Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Jark Wu
Thank you Yubin,

+1 for the proposal. We have been lacking catalog related syntax to operate
catalogs.
It's a good chance to complete the syntax as we have introduced
CatalogStore.

>From what I can see, some useful commands are still missing for catalogs,
such as, alter catalog, describe catalog. What do you think about including
these
syntaxes in the FLIP as well?

Best,
Jark



On Thu, 14 Mar 2024 at 10:16, Yubin Li  wrote:

> Hi Jingsong, Feng, and Jeyhun
>
> Thanks for your support and feedback!
>
> > However, could we add a new method `getCatalogDescriptor()` to
> > CatalogManager instead of directly exposing CatalogStore?
>
> Good point, Besides the audit tracking issue, The proposed feature
> only requires `getCatalogDescriptor()` function. Exposing components
> with excessive functionality will bring unnecessary risks, I have made
> modifications in the FLIP doc [1]. Thank Feng :)
>
> > Showing the SQL parser implementation in the FLIP for the SQL syntax
> > might be a bit confusing. Also, the formal definition is missing for
> > this SQL clause.
>
> Thank Jeyhun for pointing it out :) I have updated the doc [1] .
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
>
> Best,
> Yubin
>
>
> On Thu, Mar 14, 2024 at 2:18 AM Jeyhun Karimov 
> wrote:
> >
> > Hi Yubin,
> >
> > Thanks for the proposal. +1 for it.
> > I have one comment:
> >
> > I would like to see the SQL syntax for the proposed statement.  Showing
> the
> > SQL parser implementation in the FLIP
> > for the SQL syntax might be a bit confusing. Also, the formal definition
> is
> > missing for this SQL clause.
> > Maybe something like [1] might be useful. WDYT?
> >
> > Regards,
> > Jeyhun
> >
> > [1]
> >
> https://github.com/apache/flink/blob/0da60ca1a4754f858cf7c52dd4f0c97ae0e1b0cb/docs/content/docs/dev/table/sql/show.md?plain=1#L620-L632
> >
> > On Wed, Mar 13, 2024 at 3:28 PM Feng Jin  wrote:
> >
> > > Hi Yubin
> > >
> > > Thank you for initiating this FLIP.
> > >
> > > I have just one minor question:
> > >
> > > I noticed that we added a new function `getCatalogStore` to expose
> > > CatalogStore, and it seems fine.
> > > However, could we add a new method `getCatalogDescriptor()` to
> > > CatalogManager instead of directly exposing CatalogStore?
> > > By only providing the `getCatalogDescriptor()` interface, it may be
> easier
> > > for us to implement audit tracking in CatalogManager in the future.
> WDYT ?
> > > Although we have only collected some modified events at the moment.[1]
> > >
> > >
> > > [1].
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> > >
> > > Best,
> > > Feng
> > >
> > > On Wed, Mar 13, 2024 at 5:31 PM Jingsong Li 
> > > wrote:
> > >
> > > > +1 for this.
> > > >
> > > > We are missing a series of catalog related syntaxes.
> > > > Especially after the introduction of catalog store. [1]
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
> > > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion about FLIP-436: Introduce "SHOW
> CREATE
> > > > > CATALOG" Syntax [1].
> > > > >
> > > > > At present, the `SHOW CREATE TABLE` statement provides strong
> support
> > > for
> > > > > users to easily
> > > > > reuse created tables. However, despite the increasing importance
> of the
> > > > > `Catalog` in user's
> > > > > business, there is no similar statement for users to use.
> > > > >
> > > > > According to the online discussion in FLINK-24939 [2] with Jark Wu
> and
> > > > Feng
> > > > > Jin, since `CatalogStore`
> > > > > has been introduced in FLIP-295 [3], we could use this component to
> > > > > implement such a long-awaited
> > > > > feature, Please refer to the document [1] for implementation
> details.
> > > > >
> > > > > examples as follows:
> > > > >
> > > > > Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > > > > > 'default-database'='db');
> > > > > > [INFO] Execute statement succeeded.
> > > > > > Flink SQL> show create catalog cat2;
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > | result |
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > | CREATE CATALOG `cat2` WITH (
> > > > > >   'default-database' = 'db',
> > > > > >   'type' = 'generic_in_memory'
> > > > > > )
> > > > > >  |
> > > > > >
> > > > > >
> > > >
> > >
> ++
> > > > > > 1 row in set
> > > > >
> > > > >
> > > > >
> > > > > Looking forward to hearing from you, thanks!
> > > > >
> > > > > Best 

[jira] [Created] (FLINK-34664) Add .asf.yaml for Flink CDC

2024-03-13 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-34664:
-

 Summary: Add .asf.yaml for Flink CDC
 Key: FLINK-34664
 URL: https://issues.apache.org/jira/browse/FLINK-34664
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


We need to add .asf.yaml file to Flink CDC repo to get auto-links to Apache 
Jira and update project description



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Yubin Li
Hi Jingsong, Feng, and Jeyhun

Thanks for your support and feedback!

> However, could we add a new method `getCatalogDescriptor()` to
> CatalogManager instead of directly exposing CatalogStore?

Good point, Besides the audit tracking issue, The proposed feature
only requires `getCatalogDescriptor()` function. Exposing components
with excessive functionality will bring unnecessary risks, I have made
modifications in the FLIP doc [1]. Thank Feng :)

> Showing the SQL parser implementation in the FLIP for the SQL syntax
> might be a bit confusing. Also, the formal definition is missing for
> this SQL clause.

Thank Jeyhun for pointing it out :) I have updated the doc [1] .

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756

Best,
Yubin


On Thu, Mar 14, 2024 at 2:18 AM Jeyhun Karimov  wrote:
>
> Hi Yubin,
>
> Thanks for the proposal. +1 for it.
> I have one comment:
>
> I would like to see the SQL syntax for the proposed statement.  Showing the
> SQL parser implementation in the FLIP
> for the SQL syntax might be a bit confusing. Also, the formal definition is
> missing for this SQL clause.
> Maybe something like [1] might be useful. WDYT?
>
> Regards,
> Jeyhun
>
> [1]
> https://github.com/apache/flink/blob/0da60ca1a4754f858cf7c52dd4f0c97ae0e1b0cb/docs/content/docs/dev/table/sql/show.md?plain=1#L620-L632
>
> On Wed, Mar 13, 2024 at 3:28 PM Feng Jin  wrote:
>
> > Hi Yubin
> >
> > Thank you for initiating this FLIP.
> >
> > I have just one minor question:
> >
> > I noticed that we added a new function `getCatalogStore` to expose
> > CatalogStore, and it seems fine.
> > However, could we add a new method `getCatalogDescriptor()` to
> > CatalogManager instead of directly exposing CatalogStore?
> > By only providing the `getCatalogDescriptor()` interface, it may be easier
> > for us to implement audit tracking in CatalogManager in the future.  WDYT ?
> > Although we have only collected some modified events at the moment.[1]
> >
> >
> > [1].
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
> >
> > Best,
> > Feng
> >
> > On Wed, Mar 13, 2024 at 5:31 PM Jingsong Li 
> > wrote:
> >
> > > +1 for this.
> > >
> > > We are missing a series of catalog related syntaxes.
> > > Especially after the introduction of catalog store. [1]
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
> > > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion about FLIP-436: Introduce "SHOW CREATE
> > > > CATALOG" Syntax [1].
> > > >
> > > > At present, the `SHOW CREATE TABLE` statement provides strong support
> > for
> > > > users to easily
> > > > reuse created tables. However, despite the increasing importance of the
> > > > `Catalog` in user's
> > > > business, there is no similar statement for users to use.
> > > >
> > > > According to the online discussion in FLINK-24939 [2] with Jark Wu and
> > > Feng
> > > > Jin, since `CatalogStore`
> > > > has been introduced in FLIP-295 [3], we could use this component to
> > > > implement such a long-awaited
> > > > feature, Please refer to the document [1] for implementation details.
> > > >
> > > > examples as follows:
> > > >
> > > > Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > > > > 'default-database'='db');
> > > > > [INFO] Execute statement succeeded.
> > > > > Flink SQL> show create catalog cat2;
> > > > >
> > > > >
> > >
> > ++
> > > > > | result |
> > > > >
> > > > >
> > >
> > ++
> > > > > | CREATE CATALOG `cat2` WITH (
> > > > >   'default-database' = 'db',
> > > > >   'type' = 'generic_in_memory'
> > > > > )
> > > > >  |
> > > > >
> > > > >
> > >
> > ++
> > > > > 1 row in set
> > > >
> > > >
> > > >
> > > > Looking forward to hearing from you, thanks!
> > > >
> > > > Best regards,
> > > > Yubin
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
> > > > [2] https://issues.apache.org/jira/browse/FLINK-24939
> > > > [3]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> >


Re: Flink Kubernetes Operator Failing Over FlinkDeployments to a New Cluster

2024-03-13 Thread Gyula Fóra
Hey Kevin!

The general mismatch I see here is that operators and resources are pretty
cluster dependent. The operator itself is running in the same cluster so it
feels out of scope to submit resources to different clusters, this doesn't
really sound like what any Kubernetes Operator should do in general.

To me this sounds more like a typical control plane feature that sits above
different environments and operator instances. There are a lot of features
like this, blue/green deployments also fall into this category in my head,
but there are of course many many others.

There may come a time when the Flink community decides to take on such a
scope but it feels a bit too much at this point to try to standardize this.

Cheers,
Gyula

On Wed, Mar 13, 2024 at 9:18 PM Kevin Lam 
wrote:

> Hi Max,
>
> It feels a bit hacky to need to back-up the resources directly from the
> cluster, as opposed to being able to redeploy our checked-in k8s manifests
> such that they failover correctly, but that makes sense to me and we can
> look into this approach. Thanks for the suggestion!
>
> I'd still be interested in hearing the community's thoughts on if we can
> support this in a more first-class way as part of the Apache Flink
> Kubernetes Operator.
>
> Thanks,
> Kevin
>
> On Wed, Mar 13, 2024 at 9:41 AM Maximilian Michels  wrote:
>
> > Hi Kevin,
> >
> > Theoretically, as long as you move over all k8s resources, failover
> > should work fine on the Flink and Flink Operator side. The tricky part
> > is the handover. You will need to backup all resources from the old
> > cluster, shutdown the old cluster, then re-create them on the new
> > cluster. The operator deployment and the Flink cluster should then
> > recover fine (assuming that high availability has been configured and
> > checkpointing is done to persistent storage available in the new
> > cluster). The operator state / Flink state is actually kept in
> > ConfigMaps which would be part of the resource dump.
> >
> > This method has proven to work in case of Kubernetes cluster upgrades.
> > Moving to an entirely new cluster is a bit more involved but exporting
> > all resource definitions and re-importing them into the new cluster
> > should yield the same result as long as the checkpoint paths do not
> > change.
> >
> > Probably something worth trying :)
> >
> > -Max
> >
> >
> >
> > On Wed, Mar 6, 2024 at 9:09 PM Kevin Lam 
> > wrote:
> > >
> > > Another thought could be modifying the operator to have a behaviour
> where
> > > upon first deploy, it optionally (flag/param enabled) finds the most
> > recent
> > > snapshot and uses that as the initialSavepointPath to restore and run
> the
> > > Flink job.
> > >
> > > On Wed, Mar 6, 2024 at 2:07 PM Kevin Lam 
> wrote:
> > >
> > > > Hi there,
> > > >
> > > > We use the Flink Kubernetes Operator, and I am investigating how we
> can
> > > > easily support failing over a FlinkDeployment from one Kubernetes
> > Cluster
> > > > to another in the case of an outage that requires us to migrate a
> large
> > > > number of FlinkDeployments from one K8s cluster to another.
> > > >
> > > > I understand one way to do this is to set `initialSavepoint` on all
> the
> > > > FlinkDeployments to the most recent/appropriate snapshot so the jobs
> > > > continue from where they left off, but for a large number of jobs,
> this
> > > > would be quite a bit of manual labor.
> > > >
> > > > Do others have an approach they are using? Any advice?
> > > >
> > > > Could this be something addressed in a future FLIP? Perhaps we could
> > store
> > > > some kind of metadata in object storage so that the Flink Kubernetes
> > > > Operator can restore a FlinkDeployment from where it left off, even
> if
> > the
> > > > job is shifted to another Kubernetes Cluster.
> > > >
> > > > Looking forward to hearing folks' thoughts!
> > > >
> >
>


Re: Flink Kubernetes Operator Failing Over FlinkDeployments to a New Cluster

2024-03-13 Thread Kevin Lam
Hi Max,

It feels a bit hacky to need to back-up the resources directly from the
cluster, as opposed to being able to redeploy our checked-in k8s manifests
such that they failover correctly, but that makes sense to me and we can
look into this approach. Thanks for the suggestion!

I'd still be interested in hearing the community's thoughts on if we can
support this in a more first-class way as part of the Apache Flink
Kubernetes Operator.

Thanks,
Kevin

On Wed, Mar 13, 2024 at 9:41 AM Maximilian Michels  wrote:

> Hi Kevin,
>
> Theoretically, as long as you move over all k8s resources, failover
> should work fine on the Flink and Flink Operator side. The tricky part
> is the handover. You will need to backup all resources from the old
> cluster, shutdown the old cluster, then re-create them on the new
> cluster. The operator deployment and the Flink cluster should then
> recover fine (assuming that high availability has been configured and
> checkpointing is done to persistent storage available in the new
> cluster). The operator state / Flink state is actually kept in
> ConfigMaps which would be part of the resource dump.
>
> This method has proven to work in case of Kubernetes cluster upgrades.
> Moving to an entirely new cluster is a bit more involved but exporting
> all resource definitions and re-importing them into the new cluster
> should yield the same result as long as the checkpoint paths do not
> change.
>
> Probably something worth trying :)
>
> -Max
>
>
>
> On Wed, Mar 6, 2024 at 9:09 PM Kevin Lam 
> wrote:
> >
> > Another thought could be modifying the operator to have a behaviour where
> > upon first deploy, it optionally (flag/param enabled) finds the most
> recent
> > snapshot and uses that as the initialSavepointPath to restore and run the
> > Flink job.
> >
> > On Wed, Mar 6, 2024 at 2:07 PM Kevin Lam  wrote:
> >
> > > Hi there,
> > >
> > > We use the Flink Kubernetes Operator, and I am investigating how we can
> > > easily support failing over a FlinkDeployment from one Kubernetes
> Cluster
> > > to another in the case of an outage that requires us to migrate a large
> > > number of FlinkDeployments from one K8s cluster to another.
> > >
> > > I understand one way to do this is to set `initialSavepoint` on all the
> > > FlinkDeployments to the most recent/appropriate snapshot so the jobs
> > > continue from where they left off, but for a large number of jobs, this
> > > would be quite a bit of manual labor.
> > >
> > > Do others have an approach they are using? Any advice?
> > >
> > > Could this be something addressed in a future FLIP? Perhaps we could
> store
> > > some kind of metadata in object storage so that the Flink Kubernetes
> > > Operator can restore a FlinkDeployment from where it left off, even if
> the
> > > job is shifted to another Kubernetes Cluster.
> > >
> > > Looking forward to hearing folks' thoughts!
> > >
>


Unable to override PrometheusReporter using PrometheusGateway 'filterLabelValueCharacters'

2024-03-13 Thread Ryan van Huuksloot
Hello,

I believe there is a bug with the AbstractPrometheusReporter where it
requires using `metrics.reporter.promgateway.filterLabelValueCharacters`
to ignore
filtering label value characters.

This isn't possible if you are just using the PrometheusReporter because
that config isn't passed through the MetricConfig.

I've tried to add configuration:

metrics.reporter.prom.factory.class:
org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.scope.variables.additional:
environment:staging,job:staging-job
metrics.reporter.promgateway.filterLabelValueCharacters: false

The configuration has been picked up but is still replacing the metrics "-"
to an "_".

Example:
flink_jobmanager_job_runningState{job_name="flink",job="staging_job",environment="staging",}
1.0

I'm not sure why this is the default behaviour given those characters are
allowed in the label's value, only metric names need to match the regex
.
However, having a configuration that works for both the PrometheusReporter
and PromGateway would suffice to avoid changing the default behaviour.

Is this something I could open a PR for and someone could help me merge?
I'll also open a JIRA ticket as soon as someone validates that it doesn't
work as intended.

Thanks,

Ryan van Huuksloot
Sr. Production Engineer | Streaming Platform
[image: Shopify]



[jira] [Created] (FLINK-34663) flink-opensearch connector Unable to parse response body for Response

2024-03-13 Thread wael shehata (Jira)
wael shehata created FLINK-34663:


 Summary: flink-opensearch connector Unable to parse response body 
for Response
 Key: FLINK-34663
 URL: https://issues.apache.org/jira/browse/FLINK-34663
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Opensearch
Affects Versions: 1.18.1
 Environment: Docker-Compose:
Flink 1.18.1 - Java11
OpenSearch 2.12.0
Flink-Sql-Opensearch-connector (flink 1.18.1 → Os 1.3)
Reporter: wael shehata


I`m trying to use flink-sql-opensearch connector to sink stream data to 
OpenSearch via Flink …
After submitting the Job to Flink cluster successfully , the job runs normally 
for 30sec and create the index with data … then it fails with the following 
message:
_*org.apache.flink.util.FlinkRuntimeException: Complete bulk has failed… Caused 
by: java.io.IOException: Unable to parse response body for Response*_

_*{requestLine=POST /_bulk?timeout=1m HTTP/1.1, 
host=[http://172.20.0.6:9200|http://172.20.0.6:9200/], response=HTTP/1.1 200 
OK}*_

at 
org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at 
org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at 
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at 
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
… 1 more
*Caused by: java.lang.NullPointerException*
*at java.base/java.util.Objects.requireNonNull(Unknown Source)*
*at org.opensearch.action.DocWriteResponse.(DocWriteResponse.java:140)*
*at org.opensearch.action.index.IndexResponse.(IndexResponse.java:67) …*

It seems that this error is common but without any solution …
the flink connector despite it was built for OpenSearch 1.3 , but it still 
working in sending and creating index to OpenSearch 2.12.0 … but this error 
persists with all OpenSearch versions greater than 1.13 …

*Opensearch support reply was:*
*"this is unexpected, could you please create an issue here [1], the issue is 
caused by _type property that has been removed in 2.x"*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Jeyhun Karimov
Hi Yubin,

Thanks for the proposal. +1 for it.
I have one comment:

I would like to see the SQL syntax for the proposed statement.  Showing the
SQL parser implementation in the FLIP
for the SQL syntax might be a bit confusing. Also, the formal definition is
missing for this SQL clause.
Maybe something like [1] might be useful. WDYT?

Regards,
Jeyhun

[1]
https://github.com/apache/flink/blob/0da60ca1a4754f858cf7c52dd4f0c97ae0e1b0cb/docs/content/docs/dev/table/sql/show.md?plain=1#L620-L632

On Wed, Mar 13, 2024 at 3:28 PM Feng Jin  wrote:

> Hi Yubin
>
> Thank you for initiating this FLIP.
>
> I have just one minor question:
>
> I noticed that we added a new function `getCatalogStore` to expose
> CatalogStore, and it seems fine.
> However, could we add a new method `getCatalogDescriptor()` to
> CatalogManager instead of directly exposing CatalogStore?
> By only providing the `getCatalogDescriptor()` interface, it may be easier
> for us to implement audit tracking in CatalogManager in the future.  WDYT ?
> Although we have only collected some modified events at the moment.[1]
>
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener
>
> Best,
> Feng
>
> On Wed, Mar 13, 2024 at 5:31 PM Jingsong Li 
> wrote:
>
> > +1 for this.
> >
> > We are missing a series of catalog related syntaxes.
> > Especially after the introduction of catalog store. [1]
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> >
> > Best,
> > Jingsong
> >
> > On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
> > >
> > > Hi devs,
> > >
> > > I'd like to start a discussion about FLIP-436: Introduce "SHOW CREATE
> > > CATALOG" Syntax [1].
> > >
> > > At present, the `SHOW CREATE TABLE` statement provides strong support
> for
> > > users to easily
> > > reuse created tables. However, despite the increasing importance of the
> > > `Catalog` in user's
> > > business, there is no similar statement for users to use.
> > >
> > > According to the online discussion in FLINK-24939 [2] with Jark Wu and
> > Feng
> > > Jin, since `CatalogStore`
> > > has been introduced in FLIP-295 [3], we could use this component to
> > > implement such a long-awaited
> > > feature, Please refer to the document [1] for implementation details.
> > >
> > > examples as follows:
> > >
> > > Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > > > 'default-database'='db');
> > > > [INFO] Execute statement succeeded.
> > > > Flink SQL> show create catalog cat2;
> > > >
> > > >
> >
> ++
> > > > | result |
> > > >
> > > >
> >
> ++
> > > > | CREATE CATALOG `cat2` WITH (
> > > >   'default-database' = 'db',
> > > >   'type' = 'generic_in_memory'
> > > > )
> > > >  |
> > > >
> > > >
> >
> ++
> > > > 1 row in set
> > >
> > >
> > >
> > > Looking forward to hearing from you, thanks!
> > >
> > > Best regards,
> > > Yubin
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
> > > [2] https://issues.apache.org/jira/browse/FLINK-24939
> > > [3]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> >
>


Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Feng Jin
Hi Yubin

Thank you for initiating this FLIP.

I have just one minor question:

I noticed that we added a new function `getCatalogStore` to expose
CatalogStore, and it seems fine.
However, could we add a new method `getCatalogDescriptor()` to
CatalogManager instead of directly exposing CatalogStore?
By only providing the `getCatalogDescriptor()` interface, it may be easier
for us to implement audit tracking in CatalogManager in the future.  WDYT ?
Although we have only collected some modified events at the moment.[1]


[1].
https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Catalog+Modification+Listener

Best,
Feng

On Wed, Mar 13, 2024 at 5:31 PM Jingsong Li  wrote:

> +1 for this.
>
> We are missing a series of catalog related syntaxes.
> Especially after the introduction of catalog store. [1]
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>
> Best,
> Jingsong
>
> On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
> >
> > Hi devs,
> >
> > I'd like to start a discussion about FLIP-436: Introduce "SHOW CREATE
> > CATALOG" Syntax [1].
> >
> > At present, the `SHOW CREATE TABLE` statement provides strong support for
> > users to easily
> > reuse created tables. However, despite the increasing importance of the
> > `Catalog` in user's
> > business, there is no similar statement for users to use.
> >
> > According to the online discussion in FLINK-24939 [2] with Jark Wu and
> Feng
> > Jin, since `CatalogStore`
> > has been introduced in FLIP-295 [3], we could use this component to
> > implement such a long-awaited
> > feature, Please refer to the document [1] for implementation details.
> >
> > examples as follows:
> >
> > Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > > 'default-database'='db');
> > > [INFO] Execute statement succeeded.
> > > Flink SQL> show create catalog cat2;
> > >
> > >
> ++
> > > | result |
> > >
> > >
> ++
> > > | CREATE CATALOG `cat2` WITH (
> > >   'default-database' = 'db',
> > >   'type' = 'generic_in_memory'
> > > )
> > >  |
> > >
> > >
> ++
> > > 1 row in set
> >
> >
> >
> > Looking forward to hearing from you, thanks!
> >
> > Best regards,
> > Yubin
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
> > [2] https://issues.apache.org/jira/browse/FLINK-24939
> > [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>


Re: Flink Kubernetes Operator Failing Over FlinkDeployments to a New Cluster

2024-03-13 Thread Maximilian Michels
Hi Kevin,

Theoretically, as long as you move over all k8s resources, failover
should work fine on the Flink and Flink Operator side. The tricky part
is the handover. You will need to backup all resources from the old
cluster, shutdown the old cluster, then re-create them on the new
cluster. The operator deployment and the Flink cluster should then
recover fine (assuming that high availability has been configured and
checkpointing is done to persistent storage available in the new
cluster). The operator state / Flink state is actually kept in
ConfigMaps which would be part of the resource dump.

This method has proven to work in case of Kubernetes cluster upgrades.
Moving to an entirely new cluster is a bit more involved but exporting
all resource definitions and re-importing them into the new cluster
should yield the same result as long as the checkpoint paths do not
change.

Probably something worth trying :)

-Max



On Wed, Mar 6, 2024 at 9:09 PM Kevin Lam  wrote:
>
> Another thought could be modifying the operator to have a behaviour where
> upon first deploy, it optionally (flag/param enabled) finds the most recent
> snapshot and uses that as the initialSavepointPath to restore and run the
> Flink job.
>
> On Wed, Mar 6, 2024 at 2:07 PM Kevin Lam  wrote:
>
> > Hi there,
> >
> > We use the Flink Kubernetes Operator, and I am investigating how we can
> > easily support failing over a FlinkDeployment from one Kubernetes Cluster
> > to another in the case of an outage that requires us to migrate a large
> > number of FlinkDeployments from one K8s cluster to another.
> >
> > I understand one way to do this is to set `initialSavepoint` on all the
> > FlinkDeployments to the most recent/appropriate snapshot so the jobs
> > continue from where they left off, but for a large number of jobs, this
> > would be quite a bit of manual labor.
> >
> > Do others have an approach they are using? Any advice?
> >
> > Could this be something addressed in a future FLIP? Perhaps we could store
> > some kind of metadata in object storage so that the Flink Kubernetes
> > Operator can restore a FlinkDeployment from where it left off, even if the
> > job is shifted to another Kubernetes Cluster.
> >
> > Looking forward to hearing folks' thoughts!
> >


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jane,

Thanks for your comments.


1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?


- Good point. This scenario is definitely supported.
Once a new partition is added, or in general, new splits are
discovered, PartitionAwareSplitAssigner::addSplits(Collection
newSplits)
method will be called. Inside that method, we are able to detect if a split
belongs to existing partitions or there is a new partition.
Once a new partition is detected, we add it to our existing mapping. Our
mapping looks like Map> subtaskToPartitionAssignment,
where
it maps each source subtaskID to zero or more partitions.

2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.


- Yes, I overlooked that point, fixed. Actually, the rule is much
complicated. I tried to simplify it in the FLIP. Good point.


3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?


- For compiled plan, PartitioningSpec will be used, with a json tag
"Partitioning". As a result, in the compiled plan, the source operator will
have
"abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
More about the implementation details below:


PartitioningSpec class

@JsonTypeName("Partitioning")
public final class PartitioningSpec extends SourceAbilitySpecBase {
 // some code here
@Override
public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
if (tableSource instanceof SupportsPartitioning) {
((SupportsPartitioning) tableSource).applyPartitionedRead();
} else {
throw new TableException(
String.format(
"%s does not support SupportsPartitioning.",
tableSource.getClass().getName()));
}
}
  // some code here
}


SourceAbilitySpec class

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),
@JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
@JsonSubTypes.Type(value = SourceWatermarkSpec.class),
@JsonSubTypes.Type(value = AggregatePushDownSpec.class),
+  @JsonSubTypes.Type(value = PartitioningSpec.class)   //
new added



Please let me know if that answers your questions or if you have other
comments.

Regards,
Jeyhun


On Tue, Mar 12, 2024 at 8:56 AM Jane Chan  wrote:

> Hi Jeyhun,
>
> Thank you for leading the discussion. I'm generally +1 with this proposal,
> along with some questions. Please see my comments below.
>
> 1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?
>
> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.
>
> 3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?
>
> Best,
> Jane
>
> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes 
> wrote:
>
> > Hi Jeyhun,
> >
> > I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> > "partitions" (and maybe even situations where a data source is
> partitioned
> > and bucketed).
> >
> > Separate from that, the page mentions TPC-H Q1 as an example.  For a
> join,
> > any two tables joined on the same bucket key should provide a concrete
> > example of a join.  Systems like Kafka Streams/ksqlDB call this
> > "co-partitioning"; for those systems, it is a requirement placed on the
> > input sources.  For Flink, with FLIP-434, the proposed planner rule
> > could remove the shuffle.
> >
> > Definitely a fun idea; I look forward to hearing more!
> >
> > Cheers,
> >
> > Jim
> >
> >
> > 1.
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-03-13 Thread Jeyhun Karimov
Hi Jim,


Thanks for your comments.

I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).


Now that I go through FLIP-376 [1] again, your suggestion definitely makes
sense.
For any source connector that can derive "DISTRIBUTION" metadata (e.g.,
distribution key/columns, bucket names, etc)
from the input source (e.g., given the dfs path), FLIP-434 [2] can and
should support reading pre-bucketed and/or pre-divided data.
I also added the relevant info to the FLIP-434 [2].


Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
> any two tables joined on the same bucket key should provide a concrete
> example of a join.  Systems like Kafka Streams/ksqlDB call this
> "co-partitioning"; for those systems, it is a requirement placed on the
> input sources.  For Flink, with FLIP-434, the proposed planner rule
> could remove the shuffle.


- Yes, with this proposal we might be able to remove the shuffle when
compared to Kafka Streams/ksqlDB.
One thing to note is that for join queries, the parallelism of each join
source might be different. This might result in
inconsistencies while using the pre-partitioned/pre-divided data (e.g.,
different mappings of partitions to source operators).
Therefore, it is the job of planner to detect this and adjust the
parallelism. With that having in mind,
the rest (how the split assigners perform) is consistent among many
sources.

Please let. me know if that answers your questions or if you have any other
comments.

Regards,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
[2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-434
%3A+Support+optimizations+for+pre-partitioned+data+sources


On Tue, Mar 12, 2024 at 4:11 AM Jim Hughes 
wrote:

> Hi Jeyhun,
>
> I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> "partitions" (and maybe even situations where a data source is partitioned
> and bucketed).
>
> Separate from that, the page mentions TPC-H Q1 as an example.  For a join,
> any two tables joined on the same bucket key should provide a concrete
> example of a join.  Systems like Kafka Streams/ksqlDB call this
> "co-partitioning"; for those systems, it is a requirement placed on the
> input sources.  For Flink, with FLIP-434, the proposed planner rule
> could remove the shuffle.
>
> Definitely a fun idea; I look forward to hearing more!
>
> Cheers,
>
> Jim
>
>
> 1.
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> 2.
>
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
>
> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov 
> wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion on FLIP-434: Support optimizations for
> > pre-partitioned data sources [1].
> >
> > The FLIP introduces taking advantage of pre-partitioned data sources for
> > SQL/Table API (it is already supported as experimental feature in
> > DataStream API [2]).
> >
> >
> > Please find more details in the FLIP wiki document [1].
> > Looking forward to your feedback.
> >
> > Regards,
> > Jeyhun
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> >
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-13 Thread Jing Ge
indeed! I missed that part. Thanks for the hint!

Best regards,
Jing

On Wed, Mar 13, 2024 at 6:02 AM Zakelly Lan  wrote:

> Hi Jing,
>
> The deprecation and removal of original APIs is beyond the scope of
> current FLIP, but I do add/highlight such information under "Compatibility,
> Deprecation, and Migration Plan" section.
>
>
> Best,
> Zakelly
>
> On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou 
> wrote:
>
>> Hi Zakelly,
>>
>> Thanks for your responses. I agree with it that we can keep the design
>> as it is for now and see if others have any better ideas for these
>> questions.
>>
>> Best,
>> Yunfeng
>>
>> On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan 
>> wrote:
>> >
>> > Hi Xuannan,
>> >
>> > Thanks for your comments, I modified the FLIP accordingly.
>> >
>> > Hi Yunfeng,
>> >
>> > Thanks for sharing your opinions!
>> >
>> >> Could you provide some hint on use cases where users need to mix sync
>> >> and async state operations in spite of the performance regression?
>> >> This information might help address our concerns on design. If the
>> >> mixed usage is simply something not recommended, I would prefer to
>> >> prohibit such usage from API.
>> >
>> > In fact, there is no scenario where users MUST use the sync APIs, but
>> it is much easier to use for those who are not familiar with asynchronous
>> programming. If they want to migrate their job from Flink 1.x to 2.0
>> leveraging some benefits from asynchronous APIs, they may try the mixed
>> usage. It is not user-friendly to directly throw exceptions at runtime, I
>> think our better approach is to warn users and recommend avoiding this. I
>> added an example in this FLIP.
>> >
>> > Well, I do not insist on allowing mixed usage of APIs if others reach
>> an agreement that we won't support that . I think the most important is to
>> keep the API easy to use and understand, thus I propose a unified state
>> declaration and explicit meaning in method name. WDYT?
>> >
>> >> Sorry I missed the new sink API. I do still think that it would be
>> >> better to make the package name more informative, and ".v2." does not
>> >> contain information for new Flink users who did not know the v1 of
>> >> state API. Unlike internal implementation and performance
>> >> optimization, API will hardly be compromised for now and updated in
>> >> future, so I still suggest we improve the package name now if
>> >> possible. But given the existing practice of sink v2 and
>> >> AbstractStreamOperatorV2, the current package name would be acceptable
>> >> to me if other reviewers of this FLIP agrees on it.
>> >
>> > Actually, I don't like 'v2' either. So if there is another good name,
>> I'd be happy to apply. This is a compromise to the current situation. Maybe
>> we could refine this after the retirement of original state APIs.
>> >
>> >
>> > Thanks & Best,
>> > Zakelly
>> >
>> >
>> > On Tue, Mar 12, 2024 at 1:42 PM Yunfeng Zhou <
>> flink.zhouyunf...@gmail.com> wrote:
>> >>
>> >> Hi Zakelly,
>> >>
>> >> Thanks for the quick response!
>> >>
>> >> > Actually splitting APIs into two sets ... warn them in runtime.
>> >>
>> >> Could you provide some hint on use cases where users need to mix sync
>> >> and async state operations in spite of the performance regression?
>> >> This information might help address our concerns on design. If the
>> >> mixed usage is simply something not recommended, I would prefer to
>> >> prohibit such usage from API.
>> >>
>> >> > In fact ... .sink2`.
>> >>
>> >> Sorry I missed the new sink API. I do still think that it would be
>> >> better to make the package name more informative, and ".v2." does not
>> >> contain information for new Flink users who did not know the v1 of
>> >> state API. Unlike internal implementation and performance
>> >> optimization, API will hardly be compromised for now and updated in
>> >> future, so I still suggest we improve the package name now if
>> >> possible. But given the existing practice of sink v2 and
>> >> AbstractStreamOperatorV2, the current package name would be acceptable
>> >> to me if other reviewers of this FLIP agrees on it.
>> >>
>> >> Best,
>> >> Yunfeng
>> >>
>> >> On Mon, Mar 11, 2024 at 5:27 PM Zakelly Lan 
>> wrote:
>> >> >
>> >> > Hi Yunfeng,
>> >> >
>> >> > Thanks for your comments!
>> >> >
>> >> > +1 for JingGe's suggestion to introduce an AsyncState API, instead of
>> >> > > having both get() and asyncGet() in the same State class. As a
>> >> > > supplement to its benefits, this design could help avoid having
>> users
>> >> > > to use sync and async API in a mixed way (unless they create both a
>> >> > > State and an AsyncState from the same state descriptor), which is
>> >> > > supposed to bring suboptimal performance according to the FLIP's
>> >> > > description.
>> >> >
>> >> >
>> >> > Actually splitting APIs into two sets of classes also brings some
>> >> > difficulties. In this case, users must explicitly define their usage
>> before
>> >> > actually doing state access. It is a little strange 

[jira] [Created] (FLINK-34662) Add new issue template for Flink CDC repo

2024-03-13 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-34662:
-

 Summary: Add new issue template for Flink CDC repo
 Key: FLINK-34662
 URL: https://issues.apache.org/jira/browse/FLINK-34662
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


As we migrated to Apache Jira for managing issues, we need to provide a new 
template to remind users and contributors about the new issue reporting way.

The reason we don't close issue functionality is that some historical commits 
and PRs are linked to issues before the donation, and closing the functionality 
will make those not traceable anymore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-420: Add API annotations for RocksDB StateBackend user-facing classes

2024-03-13 Thread Ahmed Hamdy
Hi Jinzhong,
Thanks for driving this,
+1 (non-binding)
Best Regards
Ahmed Hamdy


On Wed, 13 Mar 2024 at 01:33, Jing Ge  wrote:

> +1 (binding) Thanks!
>
> Best regards,
> Jing
>
> On Sun, Mar 10, 2024 at 5:32 PM Jing Ge  wrote:
>
> > Hi Jinzhong,
> >
> > Thanks for driving this topic and sorry for just joining the discussion
> > now. I replied in your discussion thread. Would you like to take a look
> > and let's keep the discussion there? I will come back to this thread and
> > vote once the discussion is done. Thanks!
> >
> > Best regards,
> > Jing
> >
> > On Thu, Mar 7, 2024 at 4:39 AM Zakelly Lan 
> wrote:
> >
> >> +1 non-binding
> >>
> >> Thanks for proposing this.
> >>
> >> Best,
> >> Zakelly
> >>
> >> On Thu, Mar 7, 2024 at 10:13 AM Yanfei Lei  wrote:
> >>
> >> > +1(binding) for this vote.
> >> >
> >> > Hangxiang Yu  于2024年3月7日周四 09:54写道:
> >> > >
> >> > > +1 (binding)
> >> > >
> >> > > On Thu, Mar 7, 2024 at 9:34 AM Yun Tang  wrote:
> >> > >
> >> > > > > +1 for this FLIP.
> >> > > > Sorry for not being clear in my previous reply, it's a binding
> vote.
> >> > > >
> >> > > > Best
> >> > > > Yun Tang
> >> > > > 
> >> > > > From: Jeyhun Karimov 
> >> > > > Sent: Thursday, March 7, 2024 4:40
> >> > > > To: dev@flink.apache.org 
> >> > > > Subject: Re: [VOTE] FLIP-420: Add API annotations for RocksDB
> >> > StateBackend
> >> > > > user-facing classes
> >> > > >
> >> > > > Hi Jinzhong,
> >> > > >
> >> > > > Thanks for the FLIP.
> >> > > >
> >> > > > +1 (non-binding)
> >> > > >
> >> > > > Regards,
> >> > > > Jeyhun
> >> > > >
> >> > > > On Wed, Mar 6, 2024 at 5:09 PM Yun Tang  wrote:
> >> > > >
> >> > > > > +1 for this FLIP.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > Best
> >> > > > > Yun Tang
> >> > > > > 
> >> > > > > From: Jinzhong Li 
> >> > > > > Sent: Wednesday, March 6, 2024 20:29
> >> > > > > To: dev@flink.apache.org 
> >> > > > > Subject: [VOTE] FLIP-420: Add API annotations for RocksDB
> >> > StateBackend
> >> > > > > user-facing classes
> >> > > > >
> >> > > > > Hi All,
> >> > > > >
> >> > > > > I'd like to start a vote on the FLIP-420: Add API annotations
> for
> >> > RocksDB
> >> > > > > StateBackend user-facing classes[1].
> >> > > > > The discussion thread is here [2].
> >> > > > >
> >> > > > > The vote will be open for at least 72 hours unless there is an
> >> > objection
> >> > > > or
> >> > > > > not enough votes.
> >> > > > >
> >> > > > >
> >> > > > > [1]https://cwiki.apache.org/confluence/x/JQs4EQ
> >> > > > > [2]
> >> https://lists.apache.org/thread/4t71lz2j2ft8hf90ylvtomynhr2qthoo
> >> > > > >
> >> > > > >
> >> > > > > Best,
> >> > > > > Jinzhong Li
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > Best,
> >> > > Hangxiang.
> >> >
> >> >
> >> >
> >> > --
> >> > Best,
> >> > Yanfei
> >> >
> >>
> >
>


Re: [DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Jingsong Li
+1 for this.

We are missing a series of catalog related syntaxes.
Especially after the introduction of catalog store. [1]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations

Best,
Jingsong

On Wed, Mar 13, 2024 at 5:09 PM Yubin Li  wrote:
>
> Hi devs,
>
> I'd like to start a discussion about FLIP-436: Introduce "SHOW CREATE
> CATALOG" Syntax [1].
>
> At present, the `SHOW CREATE TABLE` statement provides strong support for
> users to easily
> reuse created tables. However, despite the increasing importance of the
> `Catalog` in user's
> business, there is no similar statement for users to use.
>
> According to the online discussion in FLINK-24939 [2] with Jark Wu and Feng
> Jin, since `CatalogStore`
> has been introduced in FLIP-295 [3], we could use this component to
> implement such a long-awaited
> feature, Please refer to the document [1] for implementation details.
>
> examples as follows:
>
> Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> > 'default-database'='db');
> > [INFO] Execute statement succeeded.
> > Flink SQL> show create catalog cat2;
> >
> > ++
> > | result |
> >
> > ++
> > | CREATE CATALOG `cat2` WITH (
> >   'default-database' = 'db',
> >   'type' = 'generic_in_memory'
> > )
> >  |
> >
> > ++
> > 1 row in set
>
>
>
> Looking forward to hearing from you, thanks!
>
> Best regards,
> Yubin
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
> [2] https://issues.apache.org/jira/browse/FLINK-24939
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations


[jira] [Created] (FLINK-34661) TaskExecutor supports retain partitions after JM crashed.

2024-03-13 Thread Junrui Li (Jira)
Junrui Li created FLINK-34661:
-

 Summary: TaskExecutor supports retain partitions after JM crashed.
 Key: FLINK-34661
 URL: https://issues.apache.org/jira/browse/FLINK-34661
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Junrui Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-436: Introduce "SHOW CREATE CATALOG" Syntax

2024-03-13 Thread Yubin Li
Hi devs,

I'd like to start a discussion about FLIP-436: Introduce "SHOW CREATE
CATALOG" Syntax [1].

At present, the `SHOW CREATE TABLE` statement provides strong support for
users to easily
reuse created tables. However, despite the increasing importance of the
`Catalog` in user's
business, there is no similar statement for users to use.

According to the online discussion in FLINK-24939 [2] with Jark Wu and Feng
Jin, since `CatalogStore`
has been introduced in FLIP-295 [3], we could use this component to
implement such a long-awaited
feature, Please refer to the document [1] for implementation details.

examples as follows:

Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory',
> 'default-database'='db');
> [INFO] Execute statement succeeded.
> Flink SQL> show create catalog cat2;
>
> ++
> | result |
>
> ++
> | CREATE CATALOG `cat2` WITH (
>   'default-database' = 'db',
>   'type' = 'generic_in_memory'
> )
>  |
>
> ++
> 1 row in set



Looking forward to hearing from you, thanks!

Best regards,
Yubin

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=296290756
[2] https://issues.apache.org/jira/browse/FLINK-24939
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations


Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-03-13 Thread Jing Zhang
Hi, Lincoln & Ron,

Thanks for the proposal.

I agree with the question raised by Timo.

Besides, I have some other questions.
1. How to define query of dynamic table?
Use flink sql or introducing new syntax?
If use flink sql, how to handle the difference in SQL between streaming and
batch processing?
For example, a query including window aggregate based on processing time?
or a query including global order by?

2. Whether modify the query of dynamic table is allowed?
Or we could only refresh a dynamic table based on initial query?

3. How to use dynamic table?
The dynamic table seems to be similar with materialized view.  Will we do
something like materialized view rewriting during the optimization?

Best,
Jing Zhang


Timo Walther  于2024年3月13日周三 01:24写道:

> Hi Lincoln & Ron,
>
> thanks for proposing this FLIP. I think a design similar to what you
> propose has been in the heads of many people, however, I'm wondering how
> this will fit into the bigger picture.
>
> I haven't deeply reviewed the FLIP yet, but would like to ask some
> initial questions:
>
> Flink has introduced the concept of Dynamic Tables many years ago. How
> does the term "Dynamic Table" fit into Flink's regular tables and also
> how does it relate to Table API?
>
> I fear that adding the DYNAMIC TABLE keyword could cause confusion for
> users, because a term for regular CREATE TABLE (that can be "kind of
> dynamic" as well and is backed by a changelog) is then missing. Also
> given that we call our connectors for those tables, DynamicTableSource
> and DynamicTableSink.
>
> In general, I find it contradicting that a TABLE can be "paused" or
> "resumed". From an English language perspective, this does sound
> incorrect. In my opinion (without much research yet), a continuous
> updating trigger should rather be modelled as a CREATE MATERIALIZED VIEW
> (which users are familiar with?) or a new concept such as a CREATE TASK
> (that can be paused and resumed?).
>
> How do you envision re-adding the functionality of a statement set, that
> fans out to multiple tables? This is a very important use case for data
> pipelines.
>
> Since the early days of Flink SQL, we were discussing `SELECT STREAM *
> FROM T EMIT 5 MINUTES`. Your proposal seems to rephrase STREAM and EMIT,
> into other keywords DYNAMIC TABLE and FRESHNESS. But the core
> functionality is still there. I'm wondering if we should widen the scope
> (maybe not part of this FLIP but a new FLIP) to follow the standard more
> closely. Making `SELECT * FROM t` bounded by default and use new syntax
> for the dynamic behavior. Flink 2.0 would be the perfect time for this,
> however, it would require careful discussions. What do you think?
>
> Regards,
> Timo
>
>
> On 11.03.24 08:23, Ron liu wrote:
> > Hi, Dev
> >
> >
> > Lincoln Lee and I would like to start a discussion about FLIP-435:
> > Introduce a  New Dynamic Table for Simplifying Data Pipelines.
> >
> >
> > This FLIP is designed to simplify the development of data processing
> > pipelines. With Dynamic Tables with uniform SQL statements and
> > freshness, users can define batch and streaming transformations to
> > data in the same way, accelerate ETL pipeline development, and manage
> > task scheduling automatically.
> >
> >
> > For more details, see FLIP-435 [1]. Looking forward to your feedback.
> >
> >
> > [1]
> >
> >
> > Best,
> >
> > Lincoln & Ron
> >
>
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-13 Thread weijie guo
Okay, sorry, I'm not looking at the latest version of the FLIP. You've
answered my question in updated FLIP. :)

Best regards,

Weijie


weijie guo  于2024年3月13日周三 14:56写道:

> Hi Zakelly,
>
> Thanks for the proposal! I like this idea and I can see the performance
> improvements it brings.
>
> In the previous reply you mentioned “these APIs are in some newly
> introduced classes, which are located in a different package name with the
> original one”. I can see the benefits of this. To be honest, there is a lot
> of historical burdens with the old state API, maybe this is a chance to
> break free. If I understand you correctly, the new State(V2) interface will
> still support synchronous API, right? But I didn't see that in the FLIP.
>
>
>
> Best regards,
>
> Weijie
>
>
> Zakelly Lan  于2024年3月13日周三 13:03写道:
>
>> Hi Jing,
>>
>> The deprecation and removal of original APIs is beyond the scope of
>> current
>> FLIP, but I do add/highlight such information under "Compatibility,
>> Deprecation, and Migration Plan" section.
>>
>>
>> Best,
>> Zakelly
>>
>> On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou > >
>> wrote:
>>
>> > Hi Zakelly,
>> >
>> > Thanks for your responses. I agree with it that we can keep the design
>> > as it is for now and see if others have any better ideas for these
>> > questions.
>> >
>> > Best,
>> > Yunfeng
>> >
>> > On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan 
>> wrote:
>> > >
>> > > Hi Xuannan,
>> > >
>> > > Thanks for your comments, I modified the FLIP accordingly.
>> > >
>> > > Hi Yunfeng,
>> > >
>> > > Thanks for sharing your opinions!
>> > >
>> > >> Could you provide some hint on use cases where users need to mix sync
>> > >> and async state operations in spite of the performance regression?
>> > >> This information might help address our concerns on design. If the
>> > >> mixed usage is simply something not recommended, I would prefer to
>> > >> prohibit such usage from API.
>> > >
>> > > In fact, there is no scenario where users MUST use the sync APIs, but
>> it
>> > is much easier to use for those who are not familiar with asynchronous
>> > programming. If they want to migrate their job from Flink 1.x to 2.0
>> > leveraging some benefits from asynchronous APIs, they may try the mixed
>> > usage. It is not user-friendly to directly throw exceptions at runtime,
>> I
>> > think our better approach is to warn users and recommend avoiding this.
>> I
>> > added an example in this FLIP.
>> > >
>> > > Well, I do not insist on allowing mixed usage of APIs if others reach
>> an
>> > agreement that we won't support that . I think the most important is to
>> > keep the API easy to use and understand, thus I propose a unified state
>> > declaration and explicit meaning in method name. WDYT?
>> > >
>> > >> Sorry I missed the new sink API. I do still think that it would be
>> > >> better to make the package name more informative, and ".v2." does not
>> > >> contain information for new Flink users who did not know the v1 of
>> > >> state API. Unlike internal implementation and performance
>> > >> optimization, API will hardly be compromised for now and updated in
>> > >> future, so I still suggest we improve the package name now if
>> > >> possible. But given the existing practice of sink v2 and
>> > >> AbstractStreamOperatorV2, the current package name would be
>> acceptable
>> > >> to me if other reviewers of this FLIP agrees on it.
>> > >
>> > > Actually, I don't like 'v2' either. So if there is another good name,
>> > I'd be happy to apply. This is a compromise to the current situation.
>> Maybe
>> > we could refine this after the retirement of original state APIs.
>> > >
>> > >
>> > > Thanks & Best,
>> > > Zakelly
>> > >
>> > >
>> > > On Tue, Mar 12, 2024 at 1:42 PM Yunfeng Zhou <
>> > flink.zhouyunf...@gmail.com> wrote:
>> > >>
>> > >> Hi Zakelly,
>> > >>
>> > >> Thanks for the quick response!
>> > >>
>> > >> > Actually splitting APIs into two sets ... warn them in runtime.
>> > >>
>> > >> Could you provide some hint on use cases where users need to mix sync
>> > >> and async state operations in spite of the performance regression?
>> > >> This information might help address our concerns on design. If the
>> > >> mixed usage is simply something not recommended, I would prefer to
>> > >> prohibit such usage from API.
>> > >>
>> > >> > In fact ... .sink2`.
>> > >>
>> > >> Sorry I missed the new sink API. I do still think that it would be
>> > >> better to make the package name more informative, and ".v2." does not
>> > >> contain information for new Flink users who did not know the v1 of
>> > >> state API. Unlike internal implementation and performance
>> > >> optimization, API will hardly be compromised for now and updated in
>> > >> future, so I still suggest we improve the package name now if
>> > >> possible. But given the existing practice of sink v2 and
>> > >> AbstractStreamOperatorV2, the current package name would be
>> acceptable
>> > >> to me if other reviewers of this FLIP agrees on 

Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-03-13 Thread Hao Li
Hi Jark,

Thanks for your questions. These are good questions!

1. The polymorphism table function I was referring to takes a table as
input and outputs a table. So the syntax would be like
```
SELECT * FROM ML_PREDICT('model', (SELECT * FROM my_table))
```
As far as I know, this is not supported yet on Flink. So before it's
supported, one option for the predict function is using table function
which can output multiple columns
```
SELECT * FROM my_table, LATERAL VIEW (ML_PREDICT('model', col1, col2))
```

2. Good question. Type inference is hard for the `ML_PREDICT` function
because it takes a model name string as input. I can think of three ways of
doing type inference for it.
   1). Treat `ML_PREDICT` function as something special and during sql
parsing or planning time, if it's encountered, we need to look up the model
from the first argument which is a model name from catalog. Then we can
infer the input/output for the function.
   2). We can define a `model` keyword and use that in the predict function
to indicate the argument refers to a model. So it's like `ML_PREDICT(model
'my_model', col1, col2))`
   3). We can create a special type of table function maybe called
`ModelFunction` which can resolve the model type inference by special
handling it during parsing or planning time.
1) is hacky, 2) isn't supported in Flink for function, 3) might be a
good option.

3. I sketched the `ML_PREDICT` function for inference. But there are
limitations of the function mentioned in 1 and 2. So maybe we don't need to
introduce them as built-in functions until polymorphism table function and
we can properly deal with type inference.
After that, defining a user-defined model function should also be
straightforward.

4. For model types, do you mean 'remote', 'import', 'native' models or
other things?

5. We could support popular providers such as 'azureml', 'vertexai',
'googleai' as long as we support the `ML_PREDICT` function. Users should be
able to implement 3rd-party providers if they can implement a function
handling the input/output for the provider.

I think for the model functions, there are still dependencies or hacks we
need to sort out as a built-in function. Maybe we can separate that as a
follow up if we want to have it built-in and focus on the model syntax for
this FLIP?

Thanks,
Hao

On Tue, Mar 12, 2024 at 10:33 PM Jark Wu  wrote:

> Hi Minge, Chris, Hao,
>
> Thanks for proposing this interesting idea. I think this is a nice step
> towards
> the AI world for Apache Flink. I don't know much about AI/ML, so I may have
> some stupid questions.
>
> 1. Could you tell more about why polymorphism table function (PTF) doesn't
> work and do we have plan to use PTF as model functions?
>
> 2. What kind of object does the model map to in SQL? A relation or a data
> type?
> It looks like a data type because we use it as a parameter of the table
> function.
> If it is a data type, how does it cooperate with type inference[1]?
>
> 3. What built-in model functions will we support? How to define a
> user-defined model function?
>
> 4. What built-in model types will we support? How to define a user-defined
> model type?
>
> 5. Regarding the remote model, what providers will we support? Can users
> implement
> 3rd-party providers except OpenAI?
>
> Best,
> Jark
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
>
>
>
>
> On Wed, 13 Mar 2024 at 05:55, Hao Li  wrote:
>
> > Hi, Dev
> >
> >
> > Mingge, Chris and I would like to start a discussion about FLIP-437:
> > Support ML Models in Flink SQL.
> >
> > This FLIP is proposing to support machine learning models in Flink SQL
> > syntax so that users can CRUD models with Flink SQL and use models on
> Flink
> > to do prediction with Flink data. The FLIP also proposes new model
> entities
> > and changes to catalog interface to support model CRUD operations in
> > catalog.
> >
> > For more details, see FLIP-437 [1]. Looking forward to your feedback.
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >
> > Thanks,
> > Minge, Chris & Hao
> >
>


Re: [DISCUSS] FLIP-424: Asynchronous State APIs

2024-03-13 Thread weijie guo
Hi Zakelly,

Thanks for the proposal! I like this idea and I can see the performance
improvements it brings.

In the previous reply you mentioned “these APIs are in some newly
introduced classes, which are located in a different package name with the
original one”. I can see the benefits of this. To be honest, there is a lot
of historical burdens with the old state API, maybe this is a chance to
break free. If I understand you correctly, the new State(V2) interface will
still support synchronous API, right? But I didn't see that in the FLIP.



Best regards,

Weijie


Zakelly Lan  于2024年3月13日周三 13:03写道:

> Hi Jing,
>
> The deprecation and removal of original APIs is beyond the scope of current
> FLIP, but I do add/highlight such information under "Compatibility,
> Deprecation, and Migration Plan" section.
>
>
> Best,
> Zakelly
>
> On Wed, Mar 13, 2024 at 9:18 AM Yunfeng Zhou 
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for your responses. I agree with it that we can keep the design
> > as it is for now and see if others have any better ideas for these
> > questions.
> >
> > Best,
> > Yunfeng
> >
> > On Tue, Mar 12, 2024 at 5:23 PM Zakelly Lan 
> wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for your comments, I modified the FLIP accordingly.
> > >
> > > Hi Yunfeng,
> > >
> > > Thanks for sharing your opinions!
> > >
> > >> Could you provide some hint on use cases where users need to mix sync
> > >> and async state operations in spite of the performance regression?
> > >> This information might help address our concerns on design. If the
> > >> mixed usage is simply something not recommended, I would prefer to
> > >> prohibit such usage from API.
> > >
> > > In fact, there is no scenario where users MUST use the sync APIs, but
> it
> > is much easier to use for those who are not familiar with asynchronous
> > programming. If they want to migrate their job from Flink 1.x to 2.0
> > leveraging some benefits from asynchronous APIs, they may try the mixed
> > usage. It is not user-friendly to directly throw exceptions at runtime, I
> > think our better approach is to warn users and recommend avoiding this. I
> > added an example in this FLIP.
> > >
> > > Well, I do not insist on allowing mixed usage of APIs if others reach
> an
> > agreement that we won't support that . I think the most important is to
> > keep the API easy to use and understand, thus I propose a unified state
> > declaration and explicit meaning in method name. WDYT?
> > >
> > >> Sorry I missed the new sink API. I do still think that it would be
> > >> better to make the package name more informative, and ".v2." does not
> > >> contain information for new Flink users who did not know the v1 of
> > >> state API. Unlike internal implementation and performance
> > >> optimization, API will hardly be compromised for now and updated in
> > >> future, so I still suggest we improve the package name now if
> > >> possible. But given the existing practice of sink v2 and
> > >> AbstractStreamOperatorV2, the current package name would be acceptable
> > >> to me if other reviewers of this FLIP agrees on it.
> > >
> > > Actually, I don't like 'v2' either. So if there is another good name,
> > I'd be happy to apply. This is a compromise to the current situation.
> Maybe
> > we could refine this after the retirement of original state APIs.
> > >
> > >
> > > Thanks & Best,
> > > Zakelly
> > >
> > >
> > > On Tue, Mar 12, 2024 at 1:42 PM Yunfeng Zhou <
> > flink.zhouyunf...@gmail.com> wrote:
> > >>
> > >> Hi Zakelly,
> > >>
> > >> Thanks for the quick response!
> > >>
> > >> > Actually splitting APIs into two sets ... warn them in runtime.
> > >>
> > >> Could you provide some hint on use cases where users need to mix sync
> > >> and async state operations in spite of the performance regression?
> > >> This information might help address our concerns on design. If the
> > >> mixed usage is simply something not recommended, I would prefer to
> > >> prohibit such usage from API.
> > >>
> > >> > In fact ... .sink2`.
> > >>
> > >> Sorry I missed the new sink API. I do still think that it would be
> > >> better to make the package name more informative, and ".v2." does not
> > >> contain information for new Flink users who did not know the v1 of
> > >> state API. Unlike internal implementation and performance
> > >> optimization, API will hardly be compromised for now and updated in
> > >> future, so I still suggest we improve the package name now if
> > >> possible. But given the existing practice of sink v2 and
> > >> AbstractStreamOperatorV2, the current package name would be acceptable
> > >> to me if other reviewers of this FLIP agrees on it.
> > >>
> > >> Best,
> > >> Yunfeng
> > >>
> > >> On Mon, Mar 11, 2024 at 5:27 PM Zakelly Lan 
> > wrote:
> > >> >
> > >> > Hi Yunfeng,
> > >> >
> > >> > Thanks for your comments!
> > >> >
> > >> > +1 for JingGe's suggestion to introduce an AsyncState API, instead
> of
> > >> > > having both get() and asyncGet() in the 

[jira] [Created] (FLINK-34660) AutoRescalingITCase#testCheckpointRescalingInKeyedState AssertionError

2024-03-13 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34660:


 Summary: AutoRescalingITCase#testCheckpointRescalingInKeyedState 
AssertionError
 Key: FLINK-34660
 URL: https://issues.apache.org/jira/browse/FLINK-34660
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Hangxiang Yu


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58249=ms.vss-test-web.build-test-results-tab=4036370=100718=debug]

 
{code:java}
expected:<[(0,8000), (0,32000), (0,48000), (0,72000), (1,78000), (1,3), 
(1,54000), (0,2000), (0,1), (0,5), (0,66000), (0,74000), (0,82000), 
(1,8), (1,0), (1,16000), (1,24000), (1,4), (1,56000), (1,64000), 
(0,12000), (0,28000), (0,52000), (0,6), (0,68000), (0,76000), (1,18000), 
(1,26000), (1,34000), (1,42000), (1,58000), (0,6000), (0,14000), (0,22000), 
(0,38000), (0,46000), (0,62000), (0,7), (1,4000), (1,2), (1,36000), 
(1,44000)]> but was:<[(0,8000), (0,32000), (0,48000), (0,72000), (1,78000), 
(1,3), (1,54000), (0,2000), (0,1), (0,5), (0,66000), (0,74000), 
(0,82000), (0,23000), (0,31000), (1,8), (1,0), (1,16000), (1,24000), 
(1,4), (1,56000), (1,64000), (0,12000), (0,28000), (0,52000), (0,6), 
(0,68000), (0,76000), (1,18000), (1,26000), (1,34000), (1,42000), (1,58000), 
(0,6000), (0,14000), (0,22000), (0,19000), (0,35000), (1,4000), (1,2), 
(1,36000), (1,44000)]> {code}
 

This maybe related to FLINK-34624 as we could see from the log:
{code:java}
03:31:02,073 [ main] INFO 
org.apache.flink.runtime.testutils.PseudoRandomValueSelector [] - Randomly 
selected true for state.changelog.enabled
03:31:02,163 [jobmanager-io-thread-2] INFO 
org.apache.flink.state.changelog.AbstractChangelogStateBackend [] - 
ChangelogStateBackend is used, delegating EmbeddedRocksDBStateBackend. {code}
FLINK-34624 disables changelog since it doesn't support local rescaling 
currently.

Even if disabling changelog for AutoRescalingITCase manually, 
randomization may still be applied to it.
We should apply randomization only when it's not pre-defined.
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)