Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-02 Thread Jark Wu
Hi Jing,

Thank you for the update.

1. Could you move the CatalogStore registration API to the "Public
Interface" section?
"Proposed Changes" is more like a place to describe the implementation
details.

2. We should prefix "table." for the CatalogStore configuration. Besides,
the config key
name should be hierarchical[1]. Therefore, it may be better to use:
"table.catalog-store.kind"
"table.catalog-store.file.path"

3. I think Hang's suggestions make sense.

Others look good to me.

Best,
Jark

On Fri, 2 Jun 2023 at 17:28, Hang Ruan  wrote:

> Hi, Feng.
>
> Thanks for the update.
> The current design looks good to me. I have some minor comments.
>
> 1. The `CatalogStore` need the `open`/`close` methods to init or close the
> resource. For example, when storing the information in MySQL, the store
> needs to open and close the connections.
>
> 2. The `getCatalog` is misspelled as `optionalDescriptor`.
>
> 3. About the usage in the sql gateway.
> Considering the usage in sql gateway, the sql gateway may create a
> CatalogStore for each session.
> If we are using the MySqlCatalogStore, there would be so many connections.
> How can we reuse the connection among these sessions?
> I think sql gateway need to maintain a connection pool in
> the CatalogStoreFactory and each session get its own connection from the
> pool when it is created.
> Then the `CatalogStoreFactory` may need the `open`/`close` methods to  init
> or close its resource.
> Or is there a better way?
>
> Best,
> Hang
>
> Feng Jin  于2023年6月2日周五 14:45写道:
>
> > Thanks Jingsong.
> >
> > > Just naming, maybe `createCatalog` in TableEnv
> >
> > +1 For this, I have already updated FLIP.
> >
> >
> > Best,
> > Feng
> >
> >
> > On Fri, Jun 2, 2023 at 11:32 AM Jingsong Li 
> > wrote:
> >
> > > Thanks Feng,
> > >
> > > Just naming, maybe `createCatalog` in TableEnv, I can see many
> > > functions are converted to createxxx from registerxxx.
> > >
> > > On Fri, Jun 2, 2023 at 11:04 AM Feng Jin 
> wrote:
> > > >
> > > > Hi jark, thanks for your suggestion.
> > > >
> > > > > 1. How to register the CatalogStore for Table API? I think the
> > > > CatalogStore should be immutable once TableEnv is created. Otherwise,
> > > there
> > > > might be some data inconsistencies when CatalogStore is changed.
> > > >
> > > > Yes, We should initialize the CatalogStore when creating the
> TableEnv.
> > > > Therefore, my current proposal is to add a method to configure the
> > > > CatalogStore in EnvironmentSettings.
> > > >
> > > > // Initialize a catalog Store
> > > > CatalogStore catalogStore = new FileCatalogStore("");
> > > >
> > > > // set up the Table API
> > > > final EnvironmentSettings settings =
> > > > EnvironmentSettings.newInstance().inBatchMode()
> > > > .withCatalogStore(catalogStore)
> > > > .build();
> > > >
> > > > final TableEnvironment tableEnv =
> > > TableEnvironment.create(settings);
> > > >
> > > >
> > > > > 2. Why does the CatalogStoreFactory interface only have a default
> > > method,
> > > > not an interface method?
> > > >
> > > > Sorry, While I did refer to the Catalog interface, I agree that as a
> > new
> > > > interface, the CatalogStoreFactory should not have a default method
> but
> > > an
> > > > interface method.  I have already modified the interface.
> > > >
> > > > > 3. Please mention the alternative API in Javadoc for the
> > > > deprecated`registerCatalog`.
> > > > > 4. In the "Compatibility" section, would be better to mention the
> > > changed
> > > > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > > > persisted catalog store) is used.
> > > >
> > > > Thanks for the suggestion, I have updated the FLIP [1].
> > > >
> > > >
> > > > [1].
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > > >
> > > >
> > > > Best,
> > > > Feng
> > > >
> > > > On Thu, Jun 1, 2023 at 9:22 PM Jark Wu  wrote:
> > > >
> > > > > Hi Feng,
> > > > >
> > > > > This is a useful FLIP. Thanks for starting this discussion.
> > > > > The current design looks pretty good to me. I just have some minor
> > > > > comments.
> > > > >
> > > > > 1. How to register the CatalogStore for Table API? I think the
> > > CatalogStore
> > > > > should be immutable once TableEnv is created. Otherwise, there
> might
> > be
> > > > > some data inconsistencies when CatalogStore is changed.
> > > > >
> > > > > 2. Why does the CatalogStoreFactory interface only have a default
> > > method,
> > > > > not an interface method?
> > > > >
> > > > > 3. Please mention the alternative API in Javadoc for the deprecated
> > > > > `registerCatalog`.
> > > > >
> > > > > 4. In the "Compatibility" section, would be better to mention the
> > > changed
> > > > > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > > > > persisted catalog store) 

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-02 Thread Jark Wu
Hi Paul,

Thanks for your reply. I left my comments inline.

> As the FLIP said, it’s good to have a default main class for Flink SQLs,
> which allows users to submit Flink SQLs in the same way as DataStream
> jobs, or else users need to write their own main class.

Isn't Table API the same way as DataSream jobs to submit Flink SQL?
DataStream API also doesn't provide a default main class for users,
why do we need to provide such one for SQL?

> With the help of ExecNodeGraph, do we still need the serialized
> SessionState? If not, we could make SQL Driver accepts two serialized
> formats:

No, ExecNodeGraph doesn't need to serialize SessionState. I thought the
proposed SqlDriver was a dedicated main class accepting SQL files, is
that correct?
If true, we have to ship the SessionState for this case which is a large
work.
I think we just need a JsonPlanDriver which is a main class that accepts
JsonPlan as the parameter.


> The common solutions I know is to use distributed file systems or use
> init containers to localize the resources.

Personally, I prefer the way of init containers which doesn't depend on
additional components.
This can reduce the moving parts of a production environment.
Depending on a distributed file system makes the testing, demo, and local
setup harder than init containers.

Best,
Jark




On Fri, 2 Jun 2023 at 18:10, Paul Lam  wrote:

> The FLIP is in the early phase and some details are not included, but
> fortunately, we got lots of valuable ideas from the discussion.
>
> Thanks to everyone who joined the dissuasion!
> @Weihua @Shanmon @Shengkai @Biao @Jark
>
> This weekend I’m gonna revisit and update the FLIP, adding more
> details. Hopefully, we can further align our opinions.
>
> Best,
> Paul Lam
>
> > 2023年6月2日 18:02,Paul Lam  写道:
> >
> > Hi Jark,
> >
> > Thanks a lot for your input!
> >
> >> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> >> necessary to support SQL Driver?
> >
> > I think so. Apart from usage in SQL Gateway, SQL Driver could simplify
> > Flink SQL execution with Flink CLI.
> >
> > As the FLIP said, it’s good to have a default main class for Flink SQLs,
> > which allows users to submit Flink SQLs in the same way as DataStream
> > jobs, or else users need to write their own main class.
> >
> >>  SQL Driver needs to serialize SessionState which is very challenging
> >> but not detailed covered in the FLIP.
> >
> > With the help of ExecNodeGraph, do we still need the serialized
> > SessionState? If not, we could make SQL Driver accepts two serialized
> > formats:
> >
> > - SQL files for user-facing public usage
> > - ExecNodeGraph for internal usage
> >
> > It’s kind of similar to the relationship between job jars and jobgraphs.
> >
> >> Regarding "K8S doesn't support shipping multiple jars", is that true?
> Is it
> >> possible to support it?
> >
> > Yes, K8s doesn’t distribute any files. It’s the users’ responsibility to
> make
> > sure the resources are accessible in the containers. The common solutions
> > I know is to use distributed file systems or use init containers to
> localize the
> > resources.
> >
> > Now I lean toward introducing a fs to do the distribution job. WDYT?
> >
> > Best,
> > Paul Lam
> >
> >> 2023年6月1日 20:33,Jark Wu mailto:imj...@gmail.com>>
> 写道:
> >>
> >> Hi Paul,
> >>
> >> Thanks for starting this discussion. I like the proposal! This is a
> >> frequently requested feature!
> >>
> >> I agree with Shengkai that ExecNodeGraph as the submission object is a
> >> better idea than SQL file. To be more specific, it should be
> JsonPlanGraph
> >> or CompiledPlan which is the serializable representation. CompiledPlan
> is a
> >> clear separation between compiling/optimization/validation and
> execution.
> >> This can keep the validation and metadata accessing still on the
> SQLGateway
> >> side. This allows SQLGateway to leverage some metadata caching and UDF
> JAR
> >> caching for better compiling performance.
> >>
> >> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> >> necessary to support SQL Driver? Regarding non-interactive SQL jobs,
> users
> >> can use the Table API program for application mode. SQL Driver needs to
> >> serialize SessionState which is very challenging but not detailed
> covered
> >> in the FLIP.
> >>
> >> Regarding "K8S doesn't support shipping multiple jars", is that true?
> Is it
> >> possible to support it?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >> On Thu, 1 Jun 2023 at 16:58, Paul Lam  paullin3...@gmail.com>> wrote:
> >>
> >>> Hi Weihua,
> >>>
> >>> You’re right. Distributing the SQLs to the TMs is one of the
> challenging
> >>> parts of this FLIP.
> >>>
> >>> Web submission is not enabled in application mode currently as you
> said,
> >>> but it could be changed if we have good reasons.
> >>>
> >>> What do you think about introducing a distributed storage for SQL
> Gateway?
> >>>
> >>> We could make use of Flink file systems [1] to distribute the SQL
> Gateway
> 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Dong Lin
Hi Piotr,

Thanks for the explanations. I have some followup questions below.

On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski  wrote:

> Hi All,
>
> Thanks for chipping in the discussion Ahmed!
>
> Regarding using the REST API. Currently I'm leaning towards implementing
> this feature inside the Flink itself, via some pluggable interface.
> REST API solution would be tempting, but I guess not everyone is using
> Flink Kubernetes Operator.
>
> @Dong
>
> > I am not sure metrics such as isBackPressured are already sent to JM.
>
> Fetching code path on the JM:
>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
>
> Example code path accessing Task level metrics via JM using the
> `MetricStore`:
>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
>

Thanks for the code reference. I checked the code that invoked these two
classes and found the following information:

- AggregatingSubtasksMetricsHandler#getStoresis currently invoked only
when AggregatingJobsMetricsHandler is invoked.
- AggregatingJobsMetricsHandler is only instantiated and returned by
WebMonitorEndpoint#initializeHandlers
- WebMonitorEndpoint#initializeHandlers is only used by RestServerEndpoint.
And RestServerEndpoint invokes these handlers in response to external REST
request.

I understand that JM will get the backpressure-related metrics every time
the RestServerEndpoint receives the REST request to get these metrics. But
I am not sure if RestServerEndpoint is already always receiving the REST
metrics at regular interval (suppose there is no human manually
opening/clicking the Flink Web UI). And if it does, what is the interval?



> > For example, let's say every source operator subtask reports this metric
> to
> > JM once every 10 seconds. There are 100 source subtasks. And each subtask
> > is backpressured roughly 10% of the total time due to traffic spikes (and
> > limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> > chance that there is at least one subtask that is backpressured. Then we
> > have to wait for at least 10 seconds to check again.
>
> backPressuredTimeMsPerSecond and other related metrics (like
> busyTimeMsPerSecond) are not subject to that problem.
> They are recalculated once every metric fetching interval, and they report
> accurately on average the given subtask spent busy/idling/backpressured.
> In your example, backPressuredTimeMsPerSecond would report 100ms/s.


Suppose every subtask is already reporting backPressuredTimeMsPerSecond to
JM once every 100 ms. If a job has 10 operators (that are not chained) and
each operator has 100 subtasks, then JM would need to handle 1 requests
per second to receive metrics from these 1000 subtasks. It seems like a
non-trivial overhead for medium-to-large sized jobs and can make JM the
performance bottleneck during job execution.

I would be surprised if Flink is already paying this much overhead just for
metrics monitoring. That is the main reason I still doubt it is true. Can
you show where this 100 ms is currently configured?

Alternatively, maybe you mean that we should add extra code to invoke the
REST API at 100 ms interval. Then that means we need to considerably
increase the network/cpu overhead at JM, where the overhead will increase
as the number of TM/slots increase, which may pose risk to the scalability
of the proposed design. I am not sure we should do this. What do you think?


>
> > While it will be nice to support additional use-cases
> > with one proposal, it is probably also reasonable to make incremental
> > progress and support the low-hanging-fruit use-case first. The choice
> > really depends on the complexity and the importance of supporting the
> extra
> > use-cases.
>
> That would be true, if that was a private implementation detail or if the
> low-hanging-fruit-solution would be on the direct path to the final
> solution.
> That's unfortunately not the case here. This will add public facing API,
> that we will later need to maintain, no matter what the final solution will
> be,
> and at the moment at least I don't see it being related to a "perfect"
> solution.


Sure. Then let's decide the final solution first.


> > I guess the point is that the suggested approach, which dynamically
> > determines the checkpointing interval based on the backpressure, may
> cause
> > regression when the checkpointing interval is relatively low. This makes
> it
> > hard for users to enable this feature in production. It is like an
> > auto-driving system that is not guaranteed to work
>
> Yes, creating a more generic solution that would require less configuration
> is usually more difficult then static configurations.
> It doesn't mean we shouldn't try to do that. Especially that if my proposed
> algorithm wouldn't work good enough, there is
> an obvious solution, that any source could add a 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Piotr Nowojski
Hi All,

Thanks for chipping in the discussion Ahmed!

Regarding using the REST API. Currently I'm leaning towards implementing
this feature inside the Flink itself, via some pluggable interface.
REST API solution would be tempting, but I guess not everyone is using
Flink Kubernetes Operator.

@Dong

> I am not sure metrics such as isBackPressured are already sent to JM.

Fetching code path on the JM:
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add

Example code path accessing Task level metrics via JM using the
`MetricStore`:
org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler

> For example, let's say every source operator subtask reports this metric
to
> JM once every 10 seconds. There are 100 source subtasks. And each subtask
> is backpressured roughly 10% of the total time due to traffic spikes (and
> limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
> chance that there is at least one subtask that is backpressured. Then we
> have to wait for at least 10 seconds to check again.

backPressuredTimeMsPerSecond and other related metrics (like
busyTimeMsPerSecond) are not subject to that problem.
They are recalculated once every metric fetching interval, and they report
accurately on average the given subtask spent busy/idling/backpressured.
In your example, backPressuredTimeMsPerSecond would report 100ms/s.

> While it will be nice to support additional use-cases
> with one proposal, it is probably also reasonable to make incremental
> progress and support the low-hanging-fruit use-case first. The choice
> really depends on the complexity and the importance of supporting the
extra
> use-cases.

That would be true, if that was a private implementation detail or if the
low-hanging-fruit-solution would be on the direct path to the final
solution.
That's unfortunately not the case here. This will add public facing API,
that we will later need to maintain, no matter what the final solution will
be,
and at the moment at least I don't see it being related to a "perfect"
solution.

> I guess the point is that the suggested approach, which dynamically
> determines the checkpointing interval based on the backpressure, may cause
> regression when the checkpointing interval is relatively low. This makes
it
> hard for users to enable this feature in production. It is like an
> auto-driving system that is not guaranteed to work

Yes, creating a more generic solution that would require less configuration
is usually more difficult then static configurations.
It doesn't mean we shouldn't try to do that. Especially that if my proposed
algorithm wouldn't work good enough, there is
an obvious solution, that any source could add a metric, like let say
"processingBacklog: true/false", and the `CheckpointTrigger`
could use this as an override to always switch to the
"slowCheckpointInterval". I don't think we need it, but that's always an
option
that would be basically equivalent to your original proposal. Or even
source could add "suggestedCheckpointInterval : int", and
`CheckpointTrigger` could use that value if present as a hint in one way or
another.

> On the other hand, the approach currently proposed in the FLIP is much
> simpler as it does not depend on backpressure. Users specify the extra
> interval requirement on the specific sources (e.g. HybridSource, MySQL CDC
> Source) and can easily know the checkpointing interval will be used on the
> continuous phase of the corresponding source. This is pretty much same as
> how users use the existing execution.checkpointing.interval config. So
> there is no extra concern of regression caused by this approach.

To an extent, but as I have already previously mentioned I really really do
not like idea of:
  - having two places to configure checkpointing interval (config file and
in the Source builders)
  - giving flexibility for every source to implement a different API for
that purpose
  - creating a solution that is not generic enough, so that we will need a
completely different mechanism in the future anyway

> Sounds good. Looking forward to learning more ideas.

I have thought about this a bit more, and I think we don't need to check
for the backpressure status, or how much overloaded all of the operators
are.
We could just check three things for source operators:
1. pendingRecords (backlog length)
2. numRecordsInPerSecond
3. backPressuredTimeMsPerSecond

// int metricsUpdateInterval = 10s // obtained from config
// Next line calculates how many records can we consume from the backlog,
assuming
// that magically the reason behind a backpressure vanishes. We will use
this only as
// a safeguard  against scenarios like for example if backpressure was
caused by some
// intermittent failure/performance degradation.
maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / (1000
- 

Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-06-02 Thread Jing Ge
Hi Samrat,

Excited to see your proposal. Supporting data warehouses is one of the
major tracks for Flink. Thanks for driving it! Happy to see that we reached
consensus to prioritize the Sink over Source in the previous discussion. Do
you already have any prototype? I'd like to join the reviews.

Just out of curiosity, speaking of JDBC mode, according to the FLIP, it
should be doable to directly use the jdbc connector with Redshift, if I am
not mistaken. Will the Redshift connector provide additional features
beyond the mediator/wrapper of the jdbc connector?

Best regards,
Jing

On Thu, Jun 1, 2023 at 8:22 PM Ahmed Hamdy  wrote:

> Hi Samrat
>
> Thanks for putting up this FLIP. I agree regarding the importance of the
> use case.
> please let me know If you need any collaboration regarding integration with
> AWS connectors credential providers or regarding FLIP-171 I would be more
> than happy to assist.
> I also like Leonard's proposal for starting with DataStreamSink and
> TableSink, It would be great to have some milestones delivered as soon as
> ready.
> best regards
> Ahmed Hamdy
>
>
> On Wed, 31 May 2023 at 11:15, Samrat Deb  wrote:
>
> > Hi Liu Ron,
> >
> > > 1. Regarding the  `read.mode` and `write.mode`, you say here provides
> two
> > modes, respectively, jdbc and `unload or copy`, What is the default value
> > for `read.mode` and `write.mode?
> >
> > I have made an effort to make the configuration options `read.mode` and
> > `write.mode` mandatory for the "flink-connector-redshift" according to
> > FLIP[1]. The rationale behind this decision is to empower users who are
> > familiar with their Redshift setup and have specific expectations for the
> > sink. By making these configurations mandatory, users can have more
> control
> > and flexibility in configuring the connector to meet their requirements.
> >
> > However, I am open to receiving feedback on whether it would be
> beneficial
> > to make the configuration options non-mandatory and set default values
> for
> > them. If you believe there are advantages to having default values or any
> > other suggestions, please share your thoughts. Your feedback is highly
> > appreciated.
> >
> > >  2. For Source, does it both support batch read and streaming read?
> >
> > Redshift currently does not provide native support for streaming reads,
> > although it does support streaming writes[2]. As part of the plan, I
> intend
> > to conduct a proof of concept and benchmarking to explore the
> possibilities
> > of implementing streaming reads using the Flink JDBC connector, as
> Redshift
> > is JDBC compatible.
> > However, it is important to note that, in the initial phase of
> > implementation, the focus will primarily be on supporting batch reads
> > rather than streaming reads. This approach will allow us to deliver a
> > robust and reliable solution for batch processing in phase 2 of the
> > implementation.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> > [2]
> >
> >
> https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html
> >
> > Bests,
> > Samrat
> >
> > On Wed, May 31, 2023 at 8:03 AM liu ron  wrote:
> >
> > > Hi, Samrat
> > >
> > > Thanks for driving this FLIP. It looks like supporting
> > > flink-connector-redshift is very useful to Flink. I have two question:
> > > 1. Regarding the  `read.mode` and `write.mode`, you say here provides
> two
> > > modes, respectively, jdbc and `unload or copy`, What is the default
> value
> > > for `read.mode` and `write.mode?
> > > 2. For Source, does it both support batch read and streaming read?
> > >
> > >
> > > Best,
> > > Ron
> > >
> > > Samrat Deb  于2023年5月30日周二 17:15写道:
> > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> > > >
> > > > [note] Missed the trailing link for previous mail
> > > >
> > > >
> > > >
> > > > On Tue, May 30, 2023 at 2:43 PM Samrat Deb 
> > > wrote:
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > > and I’m glad to help review the design as well as the code
> review.
> > > > > Thank you so much. It would be really great and helpful to bring
> > > > > flink-connector-redshift for flink users :) .
> > > > >
> > > > > I have divided the implementation in 3 phases in the `Scope`
> > > Section[1].
> > > > > 1st phase is to
> > > > >
> > > > >- Integrate with Flink Sink API (*FLIP-171*
> > > > ><
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> > > >
> > > > >)
> > > > >
> > > > >
> > > > > > About the implementation phases, How about prioritizing support
> for
> > > the
> > > > > Datastream Sink API and TableSink API in the first phase?
> > > > > I can completely resonate with you to prioritize support for
> > Datastream
> > > > > Sink API and TableSink API in the first phase.
> > > > > I will update the FLIP[1] as you have suggested.
> > > > >
> > > > > > 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-02 Thread Jing Ge
Hi Aitozi,

Thanks for the update. Just out of curiosity, what is the difference
between the RPC call or query you mentioned and the lookup in a very
general way? Since Lateral join is used in the FLIP. Is there any special
thought for that? Sorry for asking so many questions. The FLIP contains
limited information to understand the motivation.

Best regards,
Jing

On Fri, Jun 2, 2023 at 3:48 AM Aitozi  wrote:

> Hi Jing,
> I have updated the proposed changes to the FLIP. IMO, lookup has its
> clear
> async call requirement is due to its IO heavy operator. In our usage, sql
> users have
> logic to do some RPC call or query the third-party service which is also IO
> intensive.
> In these case, we'd like to leverage the async function to improve the
> throughput.
>
> Thanks,
> Aitozi.
>
> Jing Ge  于2023年6月1日周四 22:55写道:
>
> > Hi Aitozi,
> >
> > Sorry for the late reply. Would you like to update the proposed changes
> > with more details into the FLIP too?
> > I got your point. It looks like a rational idea. However, since lookup
> has
> > its clear async call requirement, are there any real use cases that
> > need this change? This will help us understand the motivation. After all,
> > lateral join and temporal lookup join[1] are quite different.
> >
> > Best regards,
> > Jing
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> >
> > On Wed, May 31, 2023 at 8:53 AM Aitozi  wrote:
> >
> > > Hi Jing,
> > > What do you think about it? Can we move forward this feature?
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > > Aitozi  于2023年5月29日周一 09:56写道:
> > >
> > > > Hi Jing,
> > > > > "Do you mean to support the AyncTableFunction beyond the
> > > > LookupTableSource?"
> > > > Yes, I mean to support the AyncTableFunction beyond the
> > > LookupTableSource.
> > > >
> > > > The "AsyncTableFunction" is the function with ability to be executed
> > > async
> > > > (with AsyncWaitOperator).
> > > > The async lookup join is a one of usage of this. So, we don't have to
> > > bind
> > > > the AyncTableFunction with LookupTableSource.
> > > > If User-defined AsyncTableFunction is supported, user can directly
> use
> > > > lateral table syntax to perform async operation.
> > > >
> > > > > "It would be better if you could elaborate the proposed changes wrt
> > the
> > > > CorrelatedCodeGenerator with more details"
> > > >
> > > > In the proposal, we use lateral table syntax to support the async
> table
> > > > function. So the planner will also treat this statement to a
> > > > CommonExecCorrelate node. So the runtime code should be generated in
> > > > CorrelatedCodeGenerator.
> > > > In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
> > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> > > > For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator
> to
> > > > execute the async table function.
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > > Jing Ge  于2023年5月29日周一 03:22写道:
> > > >
> > > >> Hi Aitozi,
> > > >>
> > > >> Thanks for the clarification. The naming "Lookup" might suggest
> using
> > it
> > > >> for table look up. But conceptually what the eval() method will do
> is
> > to
> > > >> get a collection of results(Row, RowData) from the given keys. How
> it
> > > will
> > > >> be done depends on the implementation, i.e. you can implement your
> own
> > > >> Source[1][2]. The example in the FLIP should be able to be handled
> in
> > > this
> > > >> way.
> > > >>
> > > >> Do you mean to support the AyncTableFunction beyond the
> > > LookupTableSource?
> > > >> It would be better if you could elaborate the proposed changes wrt
> the
> > > >> CorrelatedCodeGenerator with more details. Thanks!
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> [1]
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> > > >> [2]
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> > > >>
> > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi 
> wrote:
> > > >>
> > > >> > Hi Jing,
> > > >> > Thanks for your response. As stated in the FLIP, the purpose
> of
> > > this
> > > >> > FLIP is meant to support
> > > >> > user-defined async table function. As described in flink document
> > [1]
> > > >> >
> > > >> > Async table functions are special functions for table sources that
> > > >> perform
> > > >> > > a lookup.
> > > >> > >
> > > >> >
> > > >> > So end user can not directly define and use async table function
> > now.
> > > An
> > > >> > user case is reported in [2]
> > > 

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-02 Thread Paul Lam
The FLIP is in the early phase and some details are not included, but
fortunately, we got lots of valuable ideas from the discussion.

Thanks to everyone who joined the dissuasion!
@Weihua @Shanmon @Shengkai @Biao @Jark

This weekend I’m gonna revisit and update the FLIP, adding more 
details. Hopefully, we can further align our opinions.

Best,
Paul Lam

> 2023年6月2日 18:02,Paul Lam  写道:
> 
> Hi Jark,
> 
> Thanks a lot for your input!
> 
>> If we decide to submit ExecNodeGraph instead of SQL file, is it still
>> necessary to support SQL Driver? 
> 
> I think so. Apart from usage in SQL Gateway, SQL Driver could simplify
> Flink SQL execution with Flink CLI.
> 
> As the FLIP said, it’s good to have a default main class for Flink SQLs,
> which allows users to submit Flink SQLs in the same way as DataStream
> jobs, or else users need to write their own main class.
> 
>>  SQL Driver needs to serialize SessionState which is very challenging
>> but not detailed covered in the FLIP.
> 
> With the help of ExecNodeGraph, do we still need the serialized 
> SessionState? If not, we could make SQL Driver accepts two serialized
> formats:
> 
> - SQL files for user-facing public usage
> - ExecNodeGraph for internal usage
> 
> It’s kind of similar to the relationship between job jars and jobgraphs.
> 
>> Regarding "K8S doesn't support shipping multiple jars", is that true? Is it
>> possible to support it?
> 
> Yes, K8s doesn’t distribute any files. It’s the users’ responsibility to make
> sure the resources are accessible in the containers. The common solutions
> I know is to use distributed file systems or use init containers to localize 
> the
> resources. 
> 
> Now I lean toward introducing a fs to do the distribution job. WDYT?
> 
> Best,
> Paul Lam
> 
>> 2023年6月1日 20:33,Jark Wu mailto:imj...@gmail.com>> 写道:
>> 
>> Hi Paul,
>> 
>> Thanks for starting this discussion. I like the proposal! This is a
>> frequently requested feature!
>> 
>> I agree with Shengkai that ExecNodeGraph as the submission object is a
>> better idea than SQL file. To be more specific, it should be JsonPlanGraph
>> or CompiledPlan which is the serializable representation. CompiledPlan is a
>> clear separation between compiling/optimization/validation and execution.
>> This can keep the validation and metadata accessing still on the SQLGateway
>> side. This allows SQLGateway to leverage some metadata caching and UDF JAR
>> caching for better compiling performance.
>> 
>> If we decide to submit ExecNodeGraph instead of SQL file, is it still
>> necessary to support SQL Driver? Regarding non-interactive SQL jobs, users
>> can use the Table API program for application mode. SQL Driver needs to
>> serialize SessionState which is very challenging but not detailed covered
>> in the FLIP.
>> 
>> Regarding "K8S doesn't support shipping multiple jars", is that true? Is it
>> possible to support it?
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>> On Thu, 1 Jun 2023 at 16:58, Paul Lam > > wrote:
>> 
>>> Hi Weihua,
>>> 
>>> You’re right. Distributing the SQLs to the TMs is one of the challenging
>>> parts of this FLIP.
>>> 
>>> Web submission is not enabled in application mode currently as you said,
>>> but it could be changed if we have good reasons.
>>> 
>>> What do you think about introducing a distributed storage for SQL Gateway?
>>> 
>>> We could make use of Flink file systems [1] to distribute the SQL Gateway
>>> generated resources, that should solve the problem at its root cause.
>>> 
>>> Users could specify Flink-supported file systems to ship files. It’s only
>>> required when using SQL Gateway with K8s application mode.
>>> 
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>>>  
>>> 
>>> 
>>> Best,
>>> Paul Lam
>>> 
 2023年6月1日 13:55,Weihua Hu  写道:
 
 Thanks Paul for your reply.
 
 SQLDriver looks good to me.
 
 2. Do you mean a pass the SQL string a configuration or a program
>>> argument?
 
 
 I brought this up because we were unable to pass the SQL file to Flink
 using Kubernetes mode.
 For DataStream/Python users, they need to prepare their images for the
>>> jars
 and dependencies.
 But for SQL users, they can use a common image to run different SQL
>>> queries
 if there are no other udf requirements.
 It would be great if the SQL query and image were not bound.
 
 Using strings is a way to decouple these, but just as you mentioned, it's
 not easy to pass complex SQL.
 
> use web submission
 AFAIK, we can not use web submission in the Application mode. Please
 correct me if I'm wrong.
 
 
 Best,
 Weihua
 
 
 On Wed, May 31, 2023 at 9:37 PM Paul Lam  wrote:
 
> Hi Biao,
> 
> Thanks for your comments!
> 
>> 1. Scope: is this FLIP only targeted 

[jira] [Created] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-32247:
-

 Summary: Normal group by with time attributes after a window group 
by is interpreted as GlobalWindowAggregate
 Key: FLINK-32247
 URL: https://issues.apache.org/jira/browse/FLINK-32247
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: Qingsheng Ren


Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (SELECT 
`window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), 
INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, 
`window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; 
{code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-02 Thread Paul Lam
Hi Jark,

Thanks a lot for your input!

> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> necessary to support SQL Driver? 

I think so. Apart from usage in SQL Gateway, SQL Driver could simplify
Flink SQL execution with Flink CLI.

As the FLIP said, it’s good to have a default main class for Flink SQLs,
which allows users to submit Flink SQLs in the same way as DataStream
jobs, or else users need to write their own main class.

>  SQL Driver needs to serialize SessionState which is very challenging
> but not detailed covered in the FLIP.

With the help of ExecNodeGraph, do we still need the serialized 
SessionState? If not, we could make SQL Driver accepts two serialized
formats:

- SQL files for user-facing public usage
- ExecNodeGraph for internal usage

It’s kind of similar to the relationship between job jars and jobgraphs.

> Regarding "K8S doesn't support shipping multiple jars", is that true? Is it
> possible to support it?

Yes, K8s doesn’t distribute any files. It’s the users’ responsibility to make
sure the resources are accessible in the containers. The common solutions
I know is to use distributed file systems or use init containers to localize the
resources. 

Now I lean toward introducing a fs to do the distribution job. WDYT?

Best,
Paul Lam

> 2023年6月1日 20:33,Jark Wu  写道:
> 
> Hi Paul,
> 
> Thanks for starting this discussion. I like the proposal! This is a
> frequently requested feature!
> 
> I agree with Shengkai that ExecNodeGraph as the submission object is a
> better idea than SQL file. To be more specific, it should be JsonPlanGraph
> or CompiledPlan which is the serializable representation. CompiledPlan is a
> clear separation between compiling/optimization/validation and execution.
> This can keep the validation and metadata accessing still on the SQLGateway
> side. This allows SQLGateway to leverage some metadata caching and UDF JAR
> caching for better compiling performance.
> 
> If we decide to submit ExecNodeGraph instead of SQL file, is it still
> necessary to support SQL Driver? Regarding non-interactive SQL jobs, users
> can use the Table API program for application mode. SQL Driver needs to
> serialize SessionState which is very challenging but not detailed covered
> in the FLIP.
> 
> Regarding "K8S doesn't support shipping multiple jars", is that true? Is it
> possible to support it?
> 
> Best,
> Jark
> 
> 
> 
> On Thu, 1 Jun 2023 at 16:58, Paul Lam  wrote:
> 
>> Hi Weihua,
>> 
>> You’re right. Distributing the SQLs to the TMs is one of the challenging
>> parts of this FLIP.
>> 
>> Web submission is not enabled in application mode currently as you said,
>> but it could be changed if we have good reasons.
>> 
>> What do you think about introducing a distributed storage for SQL Gateway?
>> 
>> We could make use of Flink file systems [1] to distribute the SQL Gateway
>> generated resources, that should solve the problem at its root cause.
>> 
>> Users could specify Flink-supported file systems to ship files. It’s only
>> required when using SQL Gateway with K8s application mode.
>> 
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>> 
>> Best,
>> Paul Lam
>> 
>>> 2023年6月1日 13:55,Weihua Hu  写道:
>>> 
>>> Thanks Paul for your reply.
>>> 
>>> SQLDriver looks good to me.
>>> 
>>> 2. Do you mean a pass the SQL string a configuration or a program
>> argument?
>>> 
>>> 
>>> I brought this up because we were unable to pass the SQL file to Flink
>>> using Kubernetes mode.
>>> For DataStream/Python users, they need to prepare their images for the
>> jars
>>> and dependencies.
>>> But for SQL users, they can use a common image to run different SQL
>> queries
>>> if there are no other udf requirements.
>>> It would be great if the SQL query and image were not bound.
>>> 
>>> Using strings is a way to decouple these, but just as you mentioned, it's
>>> not easy to pass complex SQL.
>>> 
 use web submission
>>> AFAIK, we can not use web submission in the Application mode. Please
>>> correct me if I'm wrong.
>>> 
>>> 
>>> Best,
>>> Weihua
>>> 
>>> 
>>> On Wed, May 31, 2023 at 9:37 PM Paul Lam  wrote:
>>> 
 Hi Biao,
 
 Thanks for your comments!
 
> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs
 in
> Application mode? More specifically, if we use SQL client/gateway to
> execute some interactive SQLs like a SELECT query, can we ask flink to
 use
> Application mode to execute those queries after this FLIP?
 
 Thanks for pointing it out. I think only DMLs would be executed via SQL
 Driver.
 I'll add the scope to the FLIP.
 
> 2. Deployment: I believe in YARN mode, the implementation is trivial as
 we
> can ship files via YARN's tool easily but for K8s, things can be more
> complicated as Shengkai said.
 
 
 Your input is very informative. I’m thinking about using web submission,
 but it requires 

Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-02 Thread Hang Ruan
Hi, Feng.

Thanks for the update.
The current design looks good to me. I have some minor comments.

1. The `CatalogStore` need the `open`/`close` methods to init or close the
resource. For example, when storing the information in MySQL, the store
needs to open and close the connections.

2. The `getCatalog` is misspelled as `optionalDescriptor`.

3. About the usage in the sql gateway.
Considering the usage in sql gateway, the sql gateway may create a
CatalogStore for each session.
If we are using the MySqlCatalogStore, there would be so many connections.
How can we reuse the connection among these sessions?
I think sql gateway need to maintain a connection pool in
the CatalogStoreFactory and each session get its own connection from the
pool when it is created.
Then the `CatalogStoreFactory` may need the `open`/`close` methods to  init
or close its resource.
Or is there a better way?

Best,
Hang

Feng Jin  于2023年6月2日周五 14:45写道:

> Thanks Jingsong.
>
> > Just naming, maybe `createCatalog` in TableEnv
>
> +1 For this, I have already updated FLIP.
>
>
> Best,
> Feng
>
>
> On Fri, Jun 2, 2023 at 11:32 AM Jingsong Li 
> wrote:
>
> > Thanks Feng,
> >
> > Just naming, maybe `createCatalog` in TableEnv, I can see many
> > functions are converted to createxxx from registerxxx.
> >
> > On Fri, Jun 2, 2023 at 11:04 AM Feng Jin  wrote:
> > >
> > > Hi jark, thanks for your suggestion.
> > >
> > > > 1. How to register the CatalogStore for Table API? I think the
> > > CatalogStore should be immutable once TableEnv is created. Otherwise,
> > there
> > > might be some data inconsistencies when CatalogStore is changed.
> > >
> > > Yes, We should initialize the CatalogStore when creating the TableEnv.
> > > Therefore, my current proposal is to add a method to configure the
> > > CatalogStore in EnvironmentSettings.
> > >
> > > // Initialize a catalog Store
> > > CatalogStore catalogStore = new FileCatalogStore("");
> > >
> > > // set up the Table API
> > > final EnvironmentSettings settings =
> > > EnvironmentSettings.newInstance().inBatchMode()
> > > .withCatalogStore(catalogStore)
> > > .build();
> > >
> > > final TableEnvironment tableEnv =
> > TableEnvironment.create(settings);
> > >
> > >
> > > > 2. Why does the CatalogStoreFactory interface only have a default
> > method,
> > > not an interface method?
> > >
> > > Sorry, While I did refer to the Catalog interface, I agree that as a
> new
> > > interface, the CatalogStoreFactory should not have a default method but
> > an
> > > interface method.  I have already modified the interface.
> > >
> > > > 3. Please mention the alternative API in Javadoc for the
> > > deprecated`registerCatalog`.
> > > > 4. In the "Compatibility" section, would be better to mention the
> > changed
> > > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > > persisted catalog store) is used.
> > >
> > > Thanks for the suggestion, I have updated the FLIP [1].
> > >
> > >
> > > [1].
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Thu, Jun 1, 2023 at 9:22 PM Jark Wu  wrote:
> > >
> > > > Hi Feng,
> > > >
> > > > This is a useful FLIP. Thanks for starting this discussion.
> > > > The current design looks pretty good to me. I just have some minor
> > > > comments.
> > > >
> > > > 1. How to register the CatalogStore for Table API? I think the
> > CatalogStore
> > > > should be immutable once TableEnv is created. Otherwise, there might
> be
> > > > some data inconsistencies when CatalogStore is changed.
> > > >
> > > > 2. Why does the CatalogStoreFactory interface only have a default
> > method,
> > > > not an interface method?
> > > >
> > > > 3. Please mention the alternative API in Javadoc for the deprecated
> > > > `registerCatalog`.
> > > >
> > > > 4. In the "Compatibility" section, would be better to mention the
> > changed
> > > > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > > > persisted catalog store) is used.
> > > >
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Thu, 1 Jun 2023 at 11:26, Feng Jin  wrote:
> > > >
> > > > > Hi ,  thanks all for reviewing the flip.
> > > > >
> > > > > @Ron
> > > > >
> > > > > >  Regarding the CatalogStoreFactory#createCatalogStore method, do
> we
> > > > need
> > > > > to provide a default implementation?
> > > > >
> > > > > Yes, we will provide a default InMemoryCatalogStoreFactory to
> create
> > an
> > > > > InMemoryCatalogStore.
> > > > >
> > > > > >  If we get a Catalog from CatalogStore, after initializing it,
> > whether
> > > > we
> > > > > put it in Map catalogs again?
> > > > >
> > > > > Yes, in the current design, catalogs are stored as snapshots, and
> > once
> > > > > initialized, the Catalog will be placed in the Map
> > > > > catalogs.
> > 

Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-06-02 Thread Ahmed Hamdy
Hi Dong,
Thanks for the quick reply and for clarification, yeah that makes sense!
Best Regards,
Ahmed Hamdy

On Fri, 2 Jun 2023 at 02:59, Dong Lin  wrote:

> Hi Ahmed,
>
> Thanks for the comments.
>
> I agree with you and Piotr that it would be useful to provide a more
> generic approach to address more use-case in one proposal. On the other
> hand, I also think it is important to make sure that the alternative (more
> generic) approach can indeed address the extra use-cases reliably as
> expected. Then we can compare the pros/cons of these approaches and make
> the best choice for Flink users.
>
> If I understand your question correctly, you are asking whether it would be
> better to replace upperBoundCheckpointingIntervalForLastSource() with an
> API on the source/operator interface.
>
> The short answer is probably no. This is because the expected users of the
> API *HybridSourceBuilder#upperBoundCheckpointingIntervalForLastSource*()
> are end-users who use Flink API and connector API to develop Flink job. We
> probably don't want end-users to directly use the source/operator
> interface, which is generally more complicated and intended to be used by
> developers of source operators.
>
> FLIP-309 currently proposes to add the API
> *SplitEnumeratorContext#upperBoundCheckpointingInterval* for developers of
> source operators (e.g. HybridSource, MySQL CDC source) to upper-bound
> checkpointing interval. Are you suggesting that we should replace this API
> with a config on the source or operator constructor?
>
> This approach probably works for HybridSource. But I am not sure it works
> for MySQL CDC Source (which is also mentioned in the latest FLIP-309
> motivation section), which is implemented as one source operator rather
> than multiple source operators (which HybridSource does). And we need to
> enable the new checkpointing interval in the middle of this source
> operator's execution.
>
> If I misunderstood your suggestion, can you provide more details regarding
> the proposed API and explain its benefits?
>
> Best,
> Dong
>
>
>
> On Fri, Jun 2, 2023 at 2:12 AM Ahmed Hamdy  wrote:
>
> > Hi Dong,
> > Thanks for the great proposal.
> > The thread is very intuitive along with suggestions from Jing and Piotr.
> > As much as I like the simplicity of the proposed approach I think a much
> > wider benefit is achieved by taking a more generic approach similar to
> > Piotr's suggestion of having a `CheckpointTrigger`. I think this even
> > solidifies the argument you are discussing
> > >  On the other hand, the approach currently proposed in the FLIP is much
> > simpler as it does not depend on backpressure. Users specify the extra
> > interval requirement on the specific sources (e.g. HybridSource, MySQL
> CDC
> > Source) and can easily know the checkpointing interval will be used on
> the
> > continuous phase of the corresponding source.
> >
> > where the base HybridSource can use a `CheckpointTrigger` that doesn't
> > depend on backpressure.
> >
> >
> >
> >
> > I have a couple of questions for clarification.
> >
> > @Dong
> > Do you think in the solution in FLIP 309, instead of using
> > ```
> > /**
> >  * Upper-bound the checkpointing interval when the last source
> > added right before this
> >  * method invocation is reading data.
> >  */
> > public  > Source > ?, ?>>
> > HybridSourceBuilder
> > upperBoundCheckpointingIntervalForLastSource(
> > Duration duration) {
> > ...
> > }
> > ```
> >
> > We can have an upperBoundCheckpointingInterval configured in the Source
> > Interface, or even better in the Operator one.
> > then we can easily implement the one for HybridSource by relying on
> > delegation to the `currentReader`.
> >
> >
> > @Piotr
> >
> > Regarding the more general approach of adjusting based on generic
> > triggers/backpressure metrics. I saw you mentioned the resemblance with
> > FLIP-271,
> > Do you think it is worth going with the REST API proposal for dynamically
> > configuring the interval hence the trigger logic could be implemented on
> > Flink or external systems like Flink Kubernetes Operator?
> > Wdyt? I think the REST API proposal here sounds more and more
> interesting.
> >
> >
> > Best Regards,
> > Ahmed Hamdy
> >
> >
> > On Wed, 31 May 2023 at 07:59, Dong Lin  wrote:
> >
> > > Hi Piotr,
> > >
> > > Thanks for the reply. Please see my comments inline.
> > >
> > > On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > First of all we don't need to send any extra signal from source (or
> non
> > > > source) operators. All of the operators are already reporting
> > > backpressured
> > > > metrics [1]
> > > > and all of the metrics are already sent to JobManager. We would only
> > need
> > > >
> > >
> > > Hmm... I am not sure metrics such as isBackPressured are already sent
> to
> > > JM. According to the doc
> > > <
> >
> 

[jira] [Created] (FLINK-32246) javax.management.InstanceAlreadyExistsException

2023-06-02 Thread jeff-zou (Jira)
jeff-zou created FLINK-32246:


 Summary: javax.management.InstanceAlreadyExistsException
 Key: FLINK-32246
 URL: https://issues.apache.org/jira/browse/FLINK-32246
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.2
Reporter: jeff-zou


Flink SQL throws an exception(javax.management.InstanceAlreadyExistsException) 
when trying to perform multiple sink operations on the same kafka source .

 

sql example:
{code:java}
create table kafka_source() with ('connector'='kafka');
insert into sink_table1 select * from kafka_source;
insert into sink_table2 select * from kafka_source; {code}
The Exception as below:
{code:java}
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=*
 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
 
org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
 org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
 org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-02 Thread Feng Jin
Thanks Jingsong.

> Just naming, maybe `createCatalog` in TableEnv

+1 For this, I have already updated FLIP.


Best,
Feng


On Fri, Jun 2, 2023 at 11:32 AM Jingsong Li  wrote:

> Thanks Feng,
>
> Just naming, maybe `createCatalog` in TableEnv, I can see many
> functions are converted to createxxx from registerxxx.
>
> On Fri, Jun 2, 2023 at 11:04 AM Feng Jin  wrote:
> >
> > Hi jark, thanks for your suggestion.
> >
> > > 1. How to register the CatalogStore for Table API? I think the
> > CatalogStore should be immutable once TableEnv is created. Otherwise,
> there
> > might be some data inconsistencies when CatalogStore is changed.
> >
> > Yes, We should initialize the CatalogStore when creating the TableEnv.
> > Therefore, my current proposal is to add a method to configure the
> > CatalogStore in EnvironmentSettings.
> >
> > // Initialize a catalog Store
> > CatalogStore catalogStore = new FileCatalogStore("");
> >
> > // set up the Table API
> > final EnvironmentSettings settings =
> > EnvironmentSettings.newInstance().inBatchMode()
> > .withCatalogStore(catalogStore)
> > .build();
> >
> > final TableEnvironment tableEnv =
> TableEnvironment.create(settings);
> >
> >
> > > 2. Why does the CatalogStoreFactory interface only have a default
> method,
> > not an interface method?
> >
> > Sorry, While I did refer to the Catalog interface, I agree that as a new
> > interface, the CatalogStoreFactory should not have a default method but
> an
> > interface method.  I have already modified the interface.
> >
> > > 3. Please mention the alternative API in Javadoc for the
> > deprecated`registerCatalog`.
> > > 4. In the "Compatibility" section, would be better to mention the
> changed
> > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > persisted catalog store) is used.
> >
> > Thanks for the suggestion, I have updated the FLIP [1].
> >
> >
> > [1].
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> >
> >
> > Best,
> > Feng
> >
> > On Thu, Jun 1, 2023 at 9:22 PM Jark Wu  wrote:
> >
> > > Hi Feng,
> > >
> > > This is a useful FLIP. Thanks for starting this discussion.
> > > The current design looks pretty good to me. I just have some minor
> > > comments.
> > >
> > > 1. How to register the CatalogStore for Table API? I think the
> CatalogStore
> > > should be immutable once TableEnv is created. Otherwise, there might be
> > > some data inconsistencies when CatalogStore is changed.
> > >
> > > 2. Why does the CatalogStoreFactory interface only have a default
> method,
> > > not an interface method?
> > >
> > > 3. Please mention the alternative API in Javadoc for the deprecated
> > > `registerCatalog`.
> > >
> > > 4. In the "Compatibility" section, would be better to mention the
> changed
> > > behavior of CREATE CATALOG statement if FileCatalogStore (or other
> > > persisted catalog store) is used.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 1 Jun 2023 at 11:26, Feng Jin  wrote:
> > >
> > > > Hi ,  thanks all for reviewing the flip.
> > > >
> > > > @Ron
> > > >
> > > > >  Regarding the CatalogStoreFactory#createCatalogStore method, do we
> > > need
> > > > to provide a default implementation?
> > > >
> > > > Yes, we will provide a default InMemoryCatalogStoreFactory to create
> an
> > > > InMemoryCatalogStore.
> > > >
> > > > >  If we get a Catalog from CatalogStore, after initializing it,
> whether
> > > we
> > > > put it in Map catalogs again?
> > > >
> > > > Yes, in the current design, catalogs are stored as snapshots, and
> once
> > > > initialized, the Catalog will be placed in the Map
> > > > catalogs.
> > > > Subsequently, the Map catalogs will be the primary
> > > source
> > > > for obtaining the corresponding Catalog.
> > > >
> > > > >   how about renaming them to `catalog.store.type` and
> > > > `catalog.store.path`?
> > > >
> > > > I think it is okay. Adding "sql" at the beginning may seem a bit
> > > strange. I
> > > > will update the FLIP.
> > > >
> > > >
> > > >
> > > > @Shammon
> > > >
> > > > Thank you for the review. I have made the necessary corrections.
> > > > Regarding the modifications made to the Public Interface, I have also
> > > > included the relevant changes to the `TableEnvironment`.
> > > >
> > > >
> > > > Best,
> > > > Feng
> > > >
> > > >
> > > > On Wed, May 31, 2023 at 5:02 PM Shammon FY 
> wrote:
> > > >
> > > > > Hi feng,
> > > > >
> > > > > Thanks for updating, I have some minor comments
> > > > >
> > > > > 1. The modification of `CatalogManager` should not be in `Public
> > > > > Interfaces`, it is not a public interface.
> > > > >
> > > > > 2. `@PublicEvolving` should be added for `CatalogStore` and
> > > > > `CatalogStoreFactory`
> > > > >
> > > > > 3. The code `Optional optionalDescriptor =
> > > > > catalogStore.get(catalogName);` in the