[jira] [Created] (FLINK-32306) Multiple batch scheduler performance regressions

2023-06-11 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32306:
--

 Summary: Multiple batch scheduler performance regressions
 Key: FLINK-32306
 URL: https://issues.apache.org/jira/browse/FLINK-32306
 Project: Flink
  Issue Type: Bug
Reporter: Martijn Visser


InitScheduling.BATCH

http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=initSchedulingStrategy.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200

schedulingDownstreamTasks.BATCH 

http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=schedulingDownstreamTasks.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200

startScheduling.BATCH

http://codespeed.dak8s.net:8000/timeline/#/?exe=5&ben=startScheduling.BATCH&extr=on&quarts=on&equid=off&env=2&revs=200





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-11 Thread Becket Qin
Hi folks,

As one of the release 2.0 efforts, the release managers were discussing our
API lifecycle policies. There have been FLIP-196[1] and FLIP-197[2] that
are relevant to this topic. These two FLIPs defined the stability guarantee
of the programming APIs with various different stability annotations, and
the promotion process. A recap of the conclusion is following:

Stability:
@Internal API: can change between major/minor/patch releases.
@Experimental API: can change between major/minor/patch releases.
@PublicEvolving API: can change between major/minor releases.
@Public API: can only change between major releases.

Promotion:
An @Experimental API should be promoted to @PublicEvolving after two
releases, and a @PublicEvolving API should be promoted to @Public API after
two releases, unless there is a documented reason not to do so.

One thing not mentioned in these two FLIPs is the API deprecation process,
which is in fact critical and fundamental to how the stability guarantee is
provided in practice, because the stability is all about removing existing
APIs. For example, if we want to change a method "ResultFoo foo(ArgumentFoo
arg)" to "ResultBar bar(ArgumentBar arg)", there will be two ways to do
this:

1. Mark method "foo" as deprecated and add the new method "bar". At some
point later, remove the method "foo".
2. Simply change the API in place, that basically means removing method foo
and adding method bar at the same time.

In the first option, users are given a period with stability guarantee to
migrate from "foo" to "bar". For the second option, this migration period
is effectively 0. A zero migration period is problematic because end users
may need a feature/bug fix from a new version, but cannot upgrade right
away due to some backwards compatible changes, even though these changes
perfectly comply with the API stability guarantees defined above. So the
migration period is critical to the API stability guarantees for the end
users.

The migration period is essentially how long a deprecated API can be
removed from the source code. So with this FLIP, I'd like to kick off the
discussion about our deprecation process.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%3A+Introduce+an+API+deprecation+process

Comments are welcome!

Thanks,

Jiangjie (Becket) Qin

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%3A+Source+API+stability+guarantees
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process


Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-11 Thread Thomas Weise
Hi Mason,

Thanks for the iterations on the FLIP, I think this is in a very good shape
now.

Small correction for the MultiClusterKafkaSourceEnumerator section: "This
reader is responsible for discovering and assigning splits from 1+ cluster"

Regarding the user facing name of the connector: I agree with Gordon that
the defining characteristic is the dynamic discovery vs. the fact that
multiple clusters may be consumed in parallel. (Although, as described in
the FLIP, lossless consumer migration only works with a strategy that
involves intermittent parallel consumption of old and new clusters to drain
and switch.)

I think the "Table" in the name of those SQL connectors should avoid
confusion. Perhaps we can also solicit other ideas? I would throw
"DiscoveringKafkaSource" into the mix.

Cheers,
Thomas




On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai 
wrote:

> > Regarding (2), definitely. This is something we planned to add later on
> but
> so far keeping things common has been working well.
>
> My main worry for doing this as a later iteration is that this would
> probably be a breaking change for the public interface. If that can be
> avoided and planned ahead, I'm fine with moving forward with how it is
> right now.
>
> > DynamicKafkaSource may be confusing because it is really similar to the
> KafkaDynamicSource/Sink (table connectors).
>
> The table / sql Kafka connectors (KafkaDynamicTableFactory,
> KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes
> not really meant to be exposed to the user though.
> It can cause some confusion internally for the code maintainers, but on the
> actual public surface I don't see this being an issue.
>
> Thanks,
> Gordon
>
> On Wed, Jun 7, 2023 at 8:55 PM Mason Chen  wrote:
>
> > Hi Gordon,
> >
> > Thanks for taking a look!
> >
> > Regarding (1), there is a need from the readers to send this event at
> > startup because the reader state may reflect outdated metadata. Thus, the
> > reader should not start without fresh metadata. With fresh metadata, the
> > reader can filter splits from state--this filtering capability is
> > ultimately how we solve the common issue of "I re-configured my Kafka
> > source and removed some topic, but it refers to the old topic due to
> state
> > *[1]*". I did not mention this because I thought this is more of a detail
> > but I'll make a brief note of it.
> >
> > Regarding (2), definitely. This is something we planned to add later on
> but
> > so far keeping things common has been working well. In that regard, yes
> the
> > metadata service should expose these configurations but the source should
> > not check it into state unlike the other metadata. I'm going to add it
> to a
> > section called "future enhancements". This is also feedback that Ryan, an
> > interested user, gave earlier in this thread.
> >
> > Regarding (3), that's definitely a good point and there are some real use
> > cases, in addition to what you mentioned, to use this in single cluster
> > mode (see *[1] *above). DynamicKafkaSource may be confusing because it is
> > really similar to the KafkaDynamicSource/Sink (table connectors).
> >
> > Best,
> > Mason
> >
> > On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> > > Hi Mason,
> > >
> > > Thanks for updating the FLIP. In principle, I believe this would be a
> > > useful addition. Some comments so far:
> > >
> > > 1. In this sequence diagram [1], why is there a need for a
> > > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the
> > > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start
> > sending
> > > metadata update events to the reader once it is registered at the
> > > enumerator?
> > >
> > > 2. Looking at the new builder API, there's a few configurations that
> are
> > > common across *all *discovered Kafka clusters / topics, specifically
> the
> > > deserialization schema, offset initialization strategy, Kafka client
> > > properties, and consumer group ID. Is there any use case that users
> would
> > > want to have these configurations differ across different Kafka
> clusters?
> > > If that's the case, would it make more sense to encapsulate these
> > > configurations to be owned by the metadata service?
> > >
> > > 3. Is MultiClusterKafkaSource the best name for this connector? I find
> > that
> > > the dynamic aspect of Kafka connectivity to be a more defining
> > > characteristic, and that is the main advantage it has compared to the
> > > static KafkaSource. A user may want to use this new connector over
> > > KafkaSource even if they're just consuming from a single Kafka cluster;
> > for
> > > example, one immediate use case I can think of is Kafka repartitioning
> > with
> > > zero Flink job downtime. They create a new topic with higher
> parallelism
> > > and repartition their Kafka records from the old topic to the new
> topic,
> > > and they want the consuming Flink job to be able to move from the old
> > topic
> > >

Re: [DISCUSSION] Improved USER/SYSTEM exception wrapping in Flink code base

2023-06-11 Thread Panagiotis Garefalakis
Hey Hong,

Keep in mind that Flink 2.0 is also under discussion and breaking changes
could be introduced -- lets just make sure there is real value in a cleaner
exception hierarchy (which I believe there is).

Cheers,
Panagiotis

On Sat, Jun 10, 2023 at 4:22 AM Teoh, Hong 
wrote:

> Thanks for the engagement on the thread! Sorry for the late reply, was off
> on holidays for a bit.
>
> @Paul
>
> Thanks for linking the historical discussion. Yes I would agree that using
> classloading to determine if the exception type has come from a User
> classloader rather than System classloader would be helpful.
>
> In my opinion, we should enhance this further by also introducing a good
> exception hierarchy depending on where the USER code was called. However, I
> also note that this might be a breaking change for some, because they might
> rely on the current exception type for job management. We could address
> this by wrapping the existing exception rather than replacing.
>
> @Panagiotis
> I agree with all your points. This proposal is in synergy with Pluggable
> Failure Enrichers.
>
> Regards,
> Hong
>
> > On 6 Jun 2023, at 06:50, Panagiotis Garefalakis 
> wrote:
> >
> > CAUTION: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
> >
> >
> >
> > Thanks for bringing this up Hong!
> >
> > Classifying exceptions was also the main driving factor behind pluggable
> > failure enrichers .
> > However, we could do a much better job maintaining a hierarchy of System
> > and User exceptions thus making the classification logic more
> > straightforward.
> >
> >   - Defining better system/user exceptions with some kind of hierarchy is
> >   definitely a step forward (and refactoring the existing ones)
> >   - Classloader filtering could definitely be used for discovering errors
> >   originating from user defined code, see doc
> >   <
> https://docs.google.com/document/d/1pcHg9F3GoDDeVD5GIIo2wO67Hmjgy0-hRDeuFnrMgT4/edit#heading=h.ato31xdnm7nk
> >
> >   - Eventually we could also release a simple failure enricher using the
> >   above improvements to automatically classify errors on JMs exceptions
> >   endpoint
> >
> > Cheers,
> > Panagiotis
> >
> > On Wed, May 31, 2023 at 9:12 PM Paul Lam  wrote:
> >
> >> Hi Hong,
> >>
> >> Thanks for starting the discussion! I believe the exception
> classification
> >> between
> >> user exceptions and system exceptions has been long-awaited.
> >>
> >> It's worth mentioning that years ago there was a related discussion [1],
> >> FYI.
> >>
> >> I’m in favor of the heuristic approach to classify the exceptions by
> which
> >> classloader it comes from. In addition, we could introduce extra
> >> configurations
> >> to allow manual execution classification based on the package name of
> >> exceptions.
> >>
> >> [1] https://lists.apache.org/thread/gms4nysnb3o4v2k6421m5hsq0g7gtr81
> >>
> >> Best,
> >> Paul Lam
> >>
> >>> 2023年5月25日 23:07,Teoh, Hong  写道:
> >>>
> >>> Hi all,
> >>>
> >>> This discussion thread is to gauge community opinion and gather
> feedback
> >> on implementing a better exception hierarchy in Flink to identify
> >> exceptions that come from running “User job code” and exceptions coming
> >> from “Flink engine code”.
> >>>
> >>> Problem:
> >>> Flink provides a distributed processing engine (SYSTEM) to run a data
> >> streaming job (USER). There are many places in code where the engine
> runs
> >> “user job provided java classes”, such as serialization/deserialization,
> >> configuration objects, credential loading, running setup() method on
> >> certain Operators.
> >>> Sometimes when evaluating a stack trace, it might be hard to
> >> automatically determine if an exception is arising out of a Flink engine
> >> problem, or a problem associated to a particular job.
> >>>
> >>> Proposed way forward:
> >>> - It would be good to have an exception hierarchy maintained by Flink
> >> that separates out the exceptions arising from running “USER provided
> >> classes”. That way, we can improve our ability to automatically classify
> >> and mitigate these exceptions.
> >>> - We could also include separating out the places where exception
> >> originates based on function - FlinkSerializationException,
> >> FlinkConfigurationException.. etc. (we already have a similar concept
> with
> >> IncompatibleKeysException)
> >>> - This has synergy with FLIP-304: Pluggable Failure Enrichers (since it
> >> would simplify the logic in the USER/SYSTEM classifier there) [1].
> >>> - In addition, this has been discussed before in the context of
> updating
> >> the exception thrown by serialisers to be a Flink-specific serialisation
> >> exception instead of IllegalStateException [2]
> >>>
> >>>
> >>> Any thoughts on the above?
> >>>
> >>> Regards,
> >>> Hong
> >>>
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304

Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-11 Thread Feng Jin
hi, Hang

Thanks for your reply,  Make sense to me, I updated the FLIP.


Best,

Feng



On Fri, Jun 9, 2023 at 3:34 PM Hang Ruan  wrote:

> Hi, Feng.
>
> I reviewed this FLIP again and found some little places that may need to
> optimize.
>
> 1. `CatalogDescriptor` may need a private constructor.
> 2. The method `initCatalog` in `CatalogManager` is not introduced.
>
> Best,
> Hang
>
> Feng Jin  于2023年6月6日周二 21:17写道:
>
> > Hi Leonard,
> >
> > Thanks for your reply.
> >
> > > 1. a  How to construct a CatalogDescriptor ?
> >
> > I think it would be helpful to add a method for constructing a
> > CatalogDescriptor, as you mentioned in 1.c. I will update the
> documentation
> > later.
> >
> > > 1.b  How to visit the fields ? Could we use Configuration instead of
> > Map ?
> >
> > I believe that the use of Map options is only intended
> for
> > creating a catalog and not for accessing internal parameters.
> >
> > Since all of the relevant parameters for CREATE CATALOG are also stored
> in
> > Map options, my understanding is that using Map > String> options should suffice.
> >
> > Here is the implementation of execute CREATE CATALOG statement.
> > ```java
> > private TableResultInternal createCatalog(CreateCatalogOperation
> operation)
> > {
> > String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
> > try {
> > String catalogName = operation.getCatalogName();
> > Map properties = operation.getProperties();
> >
> > Catalog catalog =
> > FactoryUtil.createCatalog(
> > catalogName,
> > properties,
> > tableConfig,
> > resourceManager.getUserClassLoader());
> > catalogManager.registerCatalog(catalogName, catalog);
> >
> > return TableResultImpl.TABLE_RESULT_OK;
> > } catch (CatalogException e) {
> > throw new ValidationException(exMsg, e);
> > }
> > }
> > ```
> >
> >
> > >  2. Do we have plan to offer a default CatalogStore if user didn’t
> config
> > this?
> >
> > Yes, the in-memory catalogStore will be used as the default CatalogStore
> > even if the user has not configured one
> >
> >
> > Best,
> > Feng
> >
> >
> > On Tue, Jun 6, 2023 at 8:02 PM Leonard Xu  wrote:
> >
> > > Hi, Feng
> > >
> > > Sorry for reply late, but I’ve some comments about the FLIP
> > >
> > >
> > > 1. The introduced Public class CatalogDescriptor seems missed some
> > > necessary component
> > >   a) How to construct a CatalogDescriptor ?
> > >   b) How to visit the fields ? Could we use Configuration instead of
> > > Map ?
> > >   c) Could we offer a built-in factory method to build a
> > CatalogDescriptor
> > > like
> > >  public static CatalogDescriptor of(String catalogName,
> Configuration
> > > configuration)
> > >
> > > 2. The FLIP said “By default, there are two built-in CatalogStores
> > > available: the In-Memory CatalogStore and the File CatalogStore” ,
> > > Do we have plan to offer a default CatalogStore if user didn’t config
> > > this? IIUC, users can obtains the benefits  from lazy catalog
> > > initialization If
> > > we have a default catalogstore even it is in-memory.
> > >
> > > Best,
> > > Leonard
> > >
> > >
> > >
> > > > On Jun 6, 2023, at 7:08 PM, Feng Jin  wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > Thank you everyone for your valuable input. If there are no further
> > > > questions or concerns about the FLIP[1], I would like to start voting
> > > > tomorrow (6/7).
> > > >
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > > >
> > > >
> > > > Best,
> > > > Feng
> > > >
> > > >
> > > > On Sun, Jun 4, 2023 at 3:33 PM Feng Jin 
> wrote:
> > > >
> > > >> Hi Samrat,
> > > >>
> > > >> Thanks for your advice.
> > > >>
> > > >>> 1. The createCatalog method does not mention any exceptions being
> > > >> thrown.
> > > >>
> > > >> CreateCatalog will throw CatalogException like registerCatalog.  As
> > > >> CatalogException is a RuntimeException,
> > > >> there is no explicit declaration of throwing Exceptions in
> > > CatalogManager
> > > >> and TableEnvironment.
> > > >> To avoid misunderstandings, I have added the "throw
> CatalogException"
> > > flag
> > > >> to the createCatalog method definition of CatalogStore.
> > > >>
> > > >>> 2. Could you please provide an exhaustive list of the supported
> > kinds?
> > > >>
> > > >> Sure,  the documentation now includes the configuration of the
> > built-in
> > > >> CatalogStore as well as how to configure a custom CatalogStore.
> > > >>
> > > >>
> > > >> Best,
> > > >> Feng
> > > >>
> > > >>
> > > >> On Sun, Jun 4, 2023 at 4:23 AM Samrat Deb 
> > > wrote:
> > > >>
> > > >>> Hi Feng,
> > > >>>
> > > >>> Thank you for providing the proposal. I believe this feature will
> be
> > > >>> highly
> > > >>> valuable.
> > > >>>
> > > >>> I have a couple of inquiries:
> > > >>>
> > > >>> 1. According to the documentation [1], the createCatalog method
> does
> > > not
> > > >>> mention any exceptions being thrown. However, I would

Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-06-11 Thread Feng Jin
Thanks Benchao and Leonard.

'Implicitly type conversion' makes sense to me.   I will emphasize the
'Implicitly type conversion' in the document.


Best,
Feng

On Sat, Jun 10, 2023 at 10:11 AM Benchao Li  wrote:

> Thanks Leonard for the input, "Implicitly type conversion" way sounds good
> to me.
> I also agree that this should be done in planner instead of connector,
> it'll be a lot easier for connector development.
>
> Leonard Xu  于2023年6月9日周五 20:11写道:
>
> > About the semantics consideration, I have some new input after rethink.
> >
> > 1. We can support both TIMESTAMP and TIMESTAMP_LTZ expression following
> > the syntax  `SELECT [column_name(s)] FROM [table_name] FOR SYSTEM_TIME AS
> > OF `
> >
> > 2. For TIMESTAMP_LTZ type, give a long instant value to CatalogTable is
> > pretty intuitive, for TIMESTAMP_type, it will be implied cast to
> > TIMESTAMP_LTZ type by planner using session timezone and then pass to
> > CatalogTable. This case can be considered as a Function
> AsOfSnapshot(Table
> > t, TIMESTAMP_LTZ arg), which can pass arg with TIMESTAMP_LTZ type, but
> our
> > framework supports implicit type conversion thus users can also pass arg
> > with TIMESTAMP type. Hint, Spark[1] did the  implicit type conversion
> too.
> >
> > 3.I also considered handing over the implicit type conversion to the
> > connector instead of planner, such as passing a TIMESTAMP literal, and
> the
> > connector using the session timezone to perform type conversion, but this
> > is more complicated than previous planner handling, and it’s not friendly
> > to the connector developers.
> >
> > 4. The last point,  TIMESTAMP_LTZ  '1970-01-01 00:00:04.001’ should be an
> > invalid expression as if you can not define a instant point (i.e
> > TIMSTAMP_LTZ semantics in SQL) from a timestamp literal without timezone.
> > You can use explicit type conversion like `cast(ts_ntz as TIMESTAMP_LTZ)`
> > after `FOR SYSTEM_TIME AS OF ` if you want to use
> > Timestamp type/expression/literal without timezone.
> >
> > 5. The last last point, the TIMESTAMP_LTZ type of Flink SQL supports DST
> > time[2] well that will help user avoid many corner case.
> >
> >
> > Best,
> > Leonard
> >
> > [1]
> >
> https://github.com/apache/spark/blob/0ed48feab65f2d86f5dda3e16bd53f2f795f5bc5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala#L56
> > [2]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/#daylight-saving-time-support
> >
> >
> >
> >
> > > On Jun 9, 2023, at 1:13 PM, Benchao Li  wrote:
> > >
> > > As you can see that you must use `UNIX_TIMESTAMP` to do this work,
> that's
> > > where the time zone happens.
> > >
> > > What I'm talking about is casting timestamp/timestamp_ltz to long
> > directly,
> > > that's why the semantic is tricky when you are casting timestamp to
> long
> > > using time zone.
> > >
> > > For other systems, such as SQL server[1], they actually uses a string
> > > instead of timestamp literal `FOR SYSTEM_TIME AS OF '2021-01-01
> > > 00:00:00.000'`, I'm not sure whether they convert the string
> > implicitly
> > > to TIMESTAMP_LTZ, or they just have a different definition of the
> syntax.
> > >
> > > But for us, we are definitely using timestamp/timestmap_ltz literal
> here,
> > > that's why it is special, and we must highlight this behavior that we
> are
> > > converting a timestamp without time zone literal to long using the
> > session
> > > time zone.
> > >
> > > [1]
> > >
> >
> https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-table-usage-scenarios?view=sql-server-ver16
> > >
> > > Feng Jin  于2023年6月8日周四 11:35写道:
> > >
> > >> Hi all,
> > >>
> > >> thanks for your input
> > >>
> > >>
> > >> @Benchao
> > >>
> > >>> The type for "TIMESTAMP '2023-04-27 00:00:00'" should be "TIMESTAMP
> > >> WITHOUT TIME ZONE", converting it to unix timestamp would use UTC
> > timezone,
> > >> which is not usually expected by users.
> > >>
> > >> It was indeed the case before Flink 1.13, but now my understanding is
> > that
> > >> there have been some slight changes in the definition of TIMESTAMP.
> > >>
> > >> TIMESTAMP is currently used to specify the year, month, day, hour,
> > minute
> > >> and second. We recommend that users use
> > *UNIX_TIMESTAMP(CAST(timestamp_col
> > >> AS STRING))* to convert *TIMESTAMP values* and *long values*. The
> > >> *UNIX_TIMESTAMP* function will use the *LOCAL TIME ZONE*. Therefore,
> > >> whether converting TIMESTAMP or TIMESTAMP_LTZ to Long values will
> > involve
> > >> using the *LOCAL TIME ZONE*.
> > >>
> > >>
> > >> Here is an test:
> > >>
> > >> Flink SQL> SET 'table.local-time-zone' = 'UTC';
> > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1970-01-01 00:00:00'
> as
> > >> STRING)) as `timestamp`;
> > >> ---
> > >> timestamp
> > >> --
> > >> 0
> > >>
> > >> Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';
> > >> Flink SQL> SELECT UNIX_TIMESTAMP(CAST(TIMESTAMP '1

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-11 Thread Yang Wang
Sorry for the late reply. I am in favor of introducing such a built-in
resource localization mechanism
based on Flink FileSystem. Then FLINK-28915[1] could be the second step
which will download
the jars and dependencies to the JobManager/TaskManager local directory
before working.

The first step could be done in another ticket in Flink. Or some external
Flink jobs management system
could also take care of this.

[1]. https://issues.apache.org/jira/browse/FLINK-28915

Best,
Yang

Paul Lam  于2023年6月9日周五 17:39写道:

> Hi Mason,
>
> I get your point. I'm increasingly feeling the need to introduce a
> built-in
> file distribution mechanism for flink-kubernetes module, just like Spark
> does with `spark.kubernetes.file.upload.path` [1].
>
> I’m assuming the workflow is as follows:
>
> - KubernetesClusterDescripter uploads all local resources to a remote
>   storage via Flink filesystem (skips if the resources are already remote).
> - KubernetesApplicationClusterEntrypoint downloads the resources
>   and put them in the classpath during startup.
>
> I wouldn't mind splitting it into another FLIP to ensure that everything is
> done correctly.
>
> cc'ed @Yang to gather more opinions.
>
> [1]
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
>
> Best,
> Paul Lam
>
> 2023年6月8日 12:15,Mason Chen  写道:
>
> Hi Paul,
>
> Thanks for your response!
>
> I agree that utilizing SQL Drivers in Java applications is equally
> important
>
> as employing them in SQL Gateway. WRT init containers, I think most
> users use them just as a workaround. For example, wget a jar from the
> maven repo.
>
> We could implement the functionality in SQL Driver in a more graceful
> way and the flink-supported filesystem approach seems to be a
> good choice.
>
>
> My main point is: can we solve the problem with a design agnostic of SQL
> and Stream API? I mentioned a use case where this ability is useful for
> Java or Stream API applications. Maybe this is even a non-goal to your FLIP
> since you are focusing on the driver entrypoint.
>
> Jark mentioned some optimizations:
>
> This allows SQLGateway to leverage some metadata caching and UDF JAR
> caching for better compiling performance.
>
> It would be great to see this even outside the SQLGateway (i.e. UDF JAR
> caching).
>
> Best,
> Mason
>
> On Wed, Jun 7, 2023 at 2:26 AM Shengkai Fang  wrote:
>
> Hi. Paul.  Thanks for your update and the update makes me understand the
> design much better.
>
> But I still have some questions about the FLIP.
>
> For SQL Gateway, only DMLs need to be delegated to the SQL server
> Driver. I would think about the details and update the FLIP. Do you have
>
> some
>
> ideas already?
>
>
> If the applicaiton mode can not support library mode, I think we should
> only execute INSERT INTO and UPDATE/ DELETE statement in the application
> mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
> statements. The ANALYZE TABLE syntax need to register the statistic to the
> catalog after job finishes and the CALL PROCEDURE statement doesn't
> generate the ExecNodeGraph.
>
> * Introduce storage via option `sql-gateway.application.storage-dir`
>
> If we can not support to submit the jars through web submission, +1 to
> introduce the options to upload the files. While I think the uploader
> should be responsible to remove the uploaded jars. Can we remove the jars
> if the job is running or gateway exits?
>
> * JobID is not avaliable
>
> Can we use the returned rest client by ApplicationDeployer to query the job
> id? I am concerned that users don't know which job is related to the
> submitted SQL.
>
> * Do we need to introduce a new module named flink-table-sql-runner?
>
> It seems we need to introduce a new module. Will the new module is
> available in the distribution package? I agree with Jark that we don't need
> to introduce this for table-API users and these users have their main
> class. If we want to make users write the k8s operator more easily, I think
> we should modify the k8s operator repo. If we don't need to support SQL
> files, can we make this jar only visible in the sql-gateway like we do in
> the planner loader?[1]
>
> [1]
>
>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95
>
> Best,
> Shengkai
>
>
>
>
>
>
>
>
> Weihua Hu  于2023年6月7日周三 10:52写道:
>
> Hi,
>
> Thanks for updating the FLIP.
>
> I have two cents on the distribution of SQLs and resources.
> 1. Should we support a common file distribution mechanism for k8s
> application mode?
>  I have seen some issues and requirements on the mailing list.
>  In our production environment, we implement the download command in the
> CliFrontend.
>  And automatically add an init container to the POD for file
>
> downloading.
>
> The advantage of this
>  is that we can use all Flink-supported file systems to store files.
>
>  This need more discussion. I would a

Re: [VOTE] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-11 Thread liu ron
Hi, all.
FLIP-315 [1] has been accepted.
There are 5 binding votes, 1 non-binding votes:
- Jark Wu(binding)
- Jingsong Li (binding)
- Benchao Li (binding)
- Weijie Guo(binding)
- Jing Ge(binding)

- Aitozi (non-binding)
Thanks everyone.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL

Best,
Ron

Aitozi  于2023年6月8日周四 13:30写道:

> +1
>
> Looking forward to this feature.
>
> Best,
> Aitozi.
>
> Jing Ge  于2023年6月8日周四 04:44写道:
>
> > +1
> >
> > Best Regards,
> > Jing
> >
> > On Wed, Jun 7, 2023 at 10:52 AM weijie guo 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Jingsong Li  于2023年6月7日周三 15:59写道:
> > >
> > > > +1
> > > >
> > > > On Wed, Jun 7, 2023 at 3:03 PM Benchao Li 
> > wrote:
> > > > >
> > > > > +1, binding
> > > > >
> > > > > Jark Wu  于2023年6月7日周三 14:44写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > > 2023年6月7日 14:20,liu ron  写道:
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Thanks for all the feedback about FLIP-315: Support Operator
> > Fusion
> > > > > > Codegen
> > > > > > > for Flink SQL[1].
> > > > > > > [2] is the discussion thread.
> > > > > > >
> > > > > > > I'd like to start a vote for it. The vote will be open for at
> > least
> > > > 72
> > > > > > > hours (until June 12th, 12:00AM GMT) unless there is an
> objection
> > > or
> > > > an
> > > > > > > insufficient number of votes.
> > > > > > >
> > > > > > > [1]:
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > > > > > [2]:
> > > > https://lists.apache.org/thread/9cnqhsld4nzdr77s2fwf00o9cb2g9fmw
> > > > > > >
> > > > > > > Best,
> > > > > > > Ron
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > >
> > >
> >
>