Re: [DISCUSS] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Hao Li
Cross post David Radley's comments here from voting thread:

> I don’t think this counts as an objection, I have some comments. I should
have put this on the discussion thread earlier but have just got to this.
> - I suggest we can put a model version in the model resource. Versions
are notoriously difficult to add later; I don’t think we want to
proliferate differently named models as a model mutates. We may want to
work with non-latest models.
> - I see that the model name is the unique identifier. I realise this
would move away from the Oracle syntax – so may not be feasible short term;
but I wonder if we can have:
> - a uuid as the main identifier and the model name as an attribute.
> or
>  - a namespace (or something like a system of origin)
> to help organise models with the same name.
> - does the model have an owner? I assume that Flink model resource is the
master of the model? I imagine in the future that a model that comes in via
a new connector could be kept up to date with the external model and would
not be allowed to be changed by anything other than the connector.

Thanks for the comments. I agree supporting the model version is important.
I think we could support versioning without changing the overall syntax by
appending version number/name to the model name. Catalog implementations
can handle the versions. For example,

CREATE MODEL `my-model$1`...

"$1" would imply it's version 1. If no version is provided, we can auto
increment the version if the model name exists already or create the first
version if the model name doesn't exist yet.

As for model ownership, I'm not entirely sure about the use case and how it
should be controlled. It could be controlled from the user side through
ACL/rbac or some way in the catalog I guess. Maybe we can follow up on this
as the requirement or use case becomes more clear.

Cross post David Moravek's comments from voting thread:

> My only suggestion would be to move Catalog changes into a separate
> interface to allow us to begin with lower stability guarantees. Existing
> Catalogs would be able to opt-in by implementing it. It's a minor thing
> though, overall the FLIP is solid and the direction is pretty exciting.

I think it's fine to move model related catalog changes to a separate
interface and let the current catalog interface extend it. As model support
will be built-in in Flink, the current catalog interface will need to
support model CRUD operations. For my own education, can you elaborate more
on how separate interface will allow us to begin with lower stability
guarantees?

Thanks,
Hao


On Thu, Mar 28, 2024 at 10:14 AM Hao Li  wrote:

> Thanks Timo. I'll start a vote tomorrow if no further discussion.
>
> Thanks,
> Hao
>
> On Thu, Mar 28, 2024 at 9:33 AM Timo Walther  wrote:
>
>> Hi everyone,
>>
>> I updated the FLIP according to this discussion.
>>
>> @Hao Li: Let me know if I made a mistake somewhere. I added some
>> additional explaning comments about the new PTF syntax.
>>
>> There are no further objections from my side. If nobody objects, Hao
>> feel free to start the voting tomorrow.
>>
>> Regards,
>> Timo
>>
>>
>> On 28.03.24 16:30, Jark Wu wrote:
>> > Thanks, Hao,
>> >
>> > Sounds good to me.
>> >
>> > Best,
>> > Jark
>> >
>> > On Thu, 28 Mar 2024 at 01:02, Hao Li  wrote:
>> >
>> >> Hi Jark,
>> >>
>> >> I think we can start with supporting popular model providers such as
>> >> openai, azureml, sagemaker for remote models.
>> >>
>> >> Thanks,
>> >> Hao
>> >>
>> >> On Tue, Mar 26, 2024 at 8:15 PM Jark Wu  wrote:
>> >>
>> >>> Thanks for the PoC and updating,
>> >>>
>> >>> The final syntax looks good to me, at least it is a nice and concise
>> >> first
>> >>> step.
>> >>>
>> >>> SELECT f1, f2, label FROM
>> >>> ML_PREDICT(
>> >>>   input => `my_data`,
>> >>>   model => `my_cat`.`my_db`.`classifier_model`,
>> >>>   args => DESCRIPTOR(f1, f2));
>> >>>
>> >>> Besides, what built-in models will we support in the FLIP? This might
>> be
>> >>> important
>> >>> because it relates to what use cases can run with the new Flink
>> version
>> >> out
>> >>> of the box.
>> >>>
>> >>> Best,
>> >>> Jark
>> >>>
>> >>> On Wed, 27 Mar 2024 at 01:10, Hao Li 
>> wrote:
>> >>>
>>  Hi Timo,
>> 
>>  Yeah. For `primary key` and `from table(...)` those are explicitly
>> >>> matched
>>  in parser: [1].
>> 
>> > SELECT f1, f2, label FROM
>>  ML_PREDICT(
>>    input => `my_data`,
>>    model => `my_cat`.`my_db`.`classifier_model`,
>>    args => DESCRIPTOR(f1, f2));
>> 
>>  This named argument syntax looks good to me. It can be supported
>> >> together
>>  with
>> 
>>  SELECT f1, f2, label FROM ML_PREDICT(`my_data`,
>>  `my_cat`.`my_db`.`classifier_model`,DESCRIPTOR(f1, f2));
>> 
>>  Sure. Will let you know once updated the FLIP.
>> 
>>  [1]
>> 
>> 
>> >>>
>> >>
>> 

Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Hao Li
Thanks David Radley and David Moravek for the comments. I'll reply in the
discussion thread.

Hao

On Wed, Apr 3, 2024 at 5:45 AM David Morávek  wrote:

> +1 (binding)
>
> My only suggestion would be to move Catalog changes into a separate
> interface to allow us to begin with lower stability guarantees. Existing
> Catalogs would be able to opt-in by implementing it. It's a minor thing
> though, overall the FLIP is solid and the direction is pretty exciting.
>
> Best,
> D.
>
> On Wed, Apr 3, 2024 at 2:31 AM David Radley 
> wrote:
>
> > Hi Hao,
> > I don’t think this counts as an objection, I have some comments. I should
> > have put this on the discussion thread earlier but have just got to this.
> > - I suggest we can put a model version in the model resource. Versions
> are
> > notoriously difficult to add later; I don’t think we want to proliferate
> > differently named models as a model mutates. We may want to work with
> > non-latest models.
> > - I see that the model name is the unique identifier. I realise this
> would
> > move away from the Oracle syntax – so may not be feasible short term;
> but I
> > wonder if we can have:
> >  - a uuid as the main identifier and the model name as an attribute.
> > or
> >  - a namespace (or something like a system of origin)
> > to help organise models with the same name.
> > - does the model have an owner? I assume that Flink model resource is the
> > master of the model? I imagine in the future that a model that comes in
> via
> > a new connector could be kept up to date with the external model and
> would
> > not be allowed to be changed by anything other than the connector.
> >
> >Kind regards, David.
> >
> > From: Hao Li 
> > Date: Friday, 29 March 2024 at 16:30
> > To: dev@flink.apache.org 
> > Subject: [EXTERNAL] [VOTE] FLIP-437: Support ML Models in Flink SQL
> > Hi devs,
> >
> > I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> > SQL [1]. The discussion thread is here [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection
> or
> > insufficient votes.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >
> > [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
> >
> > Thanks,
> > Hao
> >
> > Unless otherwise stated above:
> >
> > IBM United Kingdom Limited
> > Registered in England and Wales with number 741598
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
> >
>


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Lincoln,

Thanks for your reply. My idea was to utilize MapBundleFunction as it was
already used in a similar context - MiniBatchLocalGroupAggFunction.
I can also extend my PoC for streaming sources and get back to continue our
discussion.

Regards,
Jeyhun

On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee  wrote:

> Hi Jeyhun,
>
> Thanks for your quick response!
>
> In streaming scenario, shuffle commonly occurs before the stateful
> operator, and there's a sanity check[1] when the stateful operator
> accesses the state. This implies the consistency requirement of the
> partitioner used for data shuffling and state key selector for state
> accessing(see KeyGroupStreamPartitioner for more details),
> otherwise, there may be state access errors. That is to say, in the
> streaming scenario, it is not only the strict requirement described in
> FlinkRelDistribution#requireStrict, but also the implied consistency of
> hash calculation.
>
> Also, if this flip targets both streaming and batch scenarios, it is
> recommended to do PoC validation for streaming as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-29430
>
>
> Best,
> Lincoln Lee
>
>
> Leonard Xu  于2024年4月3日周三 14:25写道:
>
> > Hey, Jeyhun
> >
> > Thanks for kicking off this discussion. I have two questions about
> > streaming sources:
> >
> > (1)The FLIP  motivation section says Kafka broker is already partitioned
> > w.r.t. some key[s] , Is this the main use case in Kafka world?
> Partitioning
> > by key fields is not the default partitioner of Kafka default
> > partitioner[1] IIUC.
> >
> > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > Streaming pre-partitioned source, could you add a Streaming Source
> example
> > to help me understand the  FLIP better? I think Kafka Source is a good
> > candidates for streaming source example, file source is a good one for
> > batch source and it really helped me to follow-up the FLIP.
> >
> > Best,
> > Leonard
> > [1]
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> >
> >
> >
> > > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> > >
> > > Hi Lincoln,
> > >
> > > Thanks a lot for your comments. Please find my answers below.
> > >
> > >
> > > 1. Is this flip targeted only at batch scenarios or does it include
> > >> streaming?
> > >> (The flip and the discussion did not explicitly mention this, but in
> the
> > >> draft pr, I only
> > >> saw the implementation for batch scenarios
> > >>
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > >> <
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > >>>
> > >> )
> > >> If we expect this also apply to streaming, then we need to consider
> the
> > >> stricter
> > >> shuffle restrictions of streaming compared to batch (if support is
> > >> considered,
> > >> more discussion is needed here, let’s not expand for now). If it only
> > >> applies to batch,
> > >> it is recommended to clarify in the flip.
> > >
> > >
> > > - The FLIP targets both streaming and batch scenarios.
> > > Could you please elaborate more on what you mean by additional
> > > restrictions?
> > >
> > >
> > > 2. In the current implementation, the optimized plan seems to have some
> > >> problems.
> > >> As described in the class comments:
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> > >
> > > BatchPhysicalHashAggregate (local)
> > >
> > >   +- BatchPhysicalLocalHashAggregate (local)
> > >>  +- BatchPhysicalTableSourceScan
> > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case
> of
> > >> one-phase
> > >> hashAgg, localAgg is not necessary, which is the scenario currently
> > handled
> > >> by
> > >> `RemoveRedundantLocalHashAggRule` and other rules)
> > >
> > >
> > > - Yes, you are completely right. Note that the PR you referenced is
> just
> > a
> > > quick PoC.
> > > Redundant operators you mentioned exist because
> > > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > > without modifying upstream/downstream operators.
> > > As I mentioned, the implementation is just a PoC and the end
> > implementation
> > > will make sure that existing redundancy elimination rules remove
> > redundant
> > > operators.
> > >
> > >
> > > Also, in the draft pr,
> > >> the optimization of `testShouldEliminatePartitioning1` &
> > >> `testShouldEliminatePartitioning2`
> > >> seems didn't take effect?
> > >>
> > >>
> >
> 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Jeyhun Karimov
Hi Leonard,

Thanks a lot for your comments. Please find my answers below:

(1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.


- I see your point. The main usecase for Kafka is for ksqlDB users.
Consider a ksqlDB query [1]

CREATE STREAM products_rekeyed
  WITH (PARTITIONS=6) AS
  SELECT *
   FROM products
   PARTITION BY product_id;

In this query, the output is re-partitioned to be keyed by product_id. In
fact, as the documentation [1] (also Jim mentioned above) mentions, this
repartitioning is a requirement for join queries.


(2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.


- Currently, I do not have Kafka Source sample integration with this FLIP.
My idea was to integrate first to the Flink main repo (e.g., file source in
streaming & batch mode) and then to external connectors.
But I can try with Kafka Source and get back.

Regards,
Jeyhun

[1] https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/

On Wed, Apr 3, 2024 at 8:25 AM Leonard Xu  wrote:

> Hey, Jeyhun
>
> Thanks for kicking off this discussion. I have two questions about
> streaming sources:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>
>
>
> > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> >
> > Hi Lincoln,
> >
> > Thanks a lot for your comments. Please find my answers below.
> >
> >
> > 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >
> >
> > - The FLIP targets both streaming and batch scenarios.
> > Could you please elaborate more on what you mean by additional
> > restrictions?
> >
> >
> > 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >
> > BatchPhysicalHashAggregate (local)
> >
> >   +- BatchPhysicalLocalHashAggregate (local)
> >>  +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules)
> >
> >
> > - Yes, you are completely right. Note that the PR you referenced is just
> a
> > quick PoC.
> > Redundant operators you mentioned exist because
> > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > without modifying upstream/downstream operators.
> > As I mentioned, the implementation is just a PoC and the end
> implementation
> > will make sure that existing redundancy elimination rules remove
> redundant
> > operators.
> >
> >
> > Also, in the draft pr,
> >> the optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> 

[IMPROVEMENT] Using ServiceLoader to load ExtendedParseStrategy

2024-04-03 Thread Naveen Kumar
Hi All,

[*DISCLAIMER]* Please ignore if it's a duplicate request.

I was looking into supported grammars for flink-sql. We do have two dialects*
DEFAULT & HIVE.  *With this we do have  ExtendedParser

which helps to support some special commands. Currently ExtendedParser

can only support some predefined strategy
.
I was wondering if we can generalize the ExtendedParser

using ServiceLoader which can eventually help new grammars with runtime
jars.

public Optional parse(String statement) {
for (ExtendedParseStrategy strategy : loadExtendedStrategies()) {
if (strategy.match(statement)) {
return Optional.of(strategy.convert(statement));
}
}
return Optional.empty();
}

private static List loadExtendedStrategies() {
// load ExtendedParserStrategy class with ServiceLoader
List parseStrategies = new ArrayList<>();
ServiceLoader extendedParseStrategies =
ServiceLoader.load(ExtendedParseStrategy.class);
for (ExtendedParseStrategy extendedParseStrategy :
extendedParseStrategies) {
parseStrategies.add(extendedParseStrategy);
}
return parseStrategies;
}


Please share your thoughts.

Thanks,
Naveen Kumar


Re: Inquiry Regarding Azure Pipelines

2024-04-03 Thread Robert Metzger
Hi Yisha,

flinkbot is currently not active, so new PRs are not triggering any AZP
builds. We hope to restore the service soon.

AZP is still the source of truth for CI builds.


On Wed, Apr 3, 2024 at 11:34 AM Yisha Zhou 
wrote:

> Hi devs,
>
> I hope this email finds you well. I am writing to seek clarification
> regarding the status of Azure Pipelines within the Apache community and
> seek assistance with a specific issue I encountered.
>
> Today, I made some new commits to a pull request in one of the Apache
> repositories. However, I noticed that even after approximately six hours,
> there were no triggers initiated for the Azure Pipeline. I have a couple of
> questions regarding this matter:
>
> 1. Is the Apache community still utilizing Azure Pipelines for CI/CD
> purposes? I came across an issue discussing the migration from Azure to
> GitHub Actions, but I am uncertain about the timeline for discontinuing the
> use of Azure Pipelines.
>
> 2. If Azure Pipelines are still in use, where can I find information about
> the position of my commits in the CI queue, awaiting execution?
>
> I would greatly appreciate any insights or guidance you can provide
> regarding these questions. Thank you for your time and attention.
>
> My PR link is https://github.com/apache/flink/pull/24567 <
> https://github.com/apache/flink/pull/24567>.
>
> Best regards,
> Yisha


Community over Code EU 2024: Start planning your trip!

2024-04-03 Thread Ryan Skraba
[Note: You're receiving this email because you are subscribed to one
or more project dev@ mailing lists at the Apache Software Foundation.]

Dear community,

We hope you are doing great, are you ready for Community Over Code EU?
Check out the featured sessions, get your tickets with special
discounts and start planning your trip.

Save your spot! Take a look at our lineup of sessions, panelists and
featured speakers and make your final choice:

* EU policies and regulations affecting open source specialists working in OSPOs

The panel will discuss how EU legislation affects the daily work of
open source operations. Panelists will cover some recent policy
updates, the challenges of staying compliant when managing open source
contribution and usage within organizations, and their personal
experiences in adapting to the changing European regulatory
environment.

* Doing for sustainability, what open source did for software

In this keynote Asim Hussain will explain the history of Impact
Framework, a coalition of hundreds of software practitioners with
tangible solutions that directly foster meaningful change by measuring
the environmental impacts of a piece of software.

Don’t forget that we have special discounts for groups, students and
Apache committers. Visit the website to discover more about these
rates.[1]

It's time for you to start planning your trip. Remember that we have
prepared a “How to get there” guide that will be helpful to find out
the best transportation, either train, bus, flight or boat to
Bratislava from wherever you are coming from. Take a look at the
different options and please reach out to us if you have any
questions.

We have available rooms -with a special rate- at the Radisson Blu
Carlton Hotel, where the event will take place and at the Park Inn
Hotel which is only 5 minutes walking from the venue. [2] However, you
are free to choose any other accommodation options around the city.

See you in Bratislava,
Community Over Code EU Team

[1]: https://eu.communityovercode.org/tickets/ "Register"
[2]: https://eu.communityovercode.org/venue/ "Where to stay"


[jira] [Created] (FLINK-35005) SqlClientITCase Failed to build JobManager image

2024-04-03 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35005:
---

 Summary: SqlClientITCase Failed to build JobManager image
 Key: FLINK-35005
 URL: https://issues.apache.org/jira/browse/FLINK-35005
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: Ryan Skraba


jdk21 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58708=logs=dc1bf4ed-4646-531a-f094-e103042be549=fb3d654d-52f8-5b98-fe9d-b18dd2e2b790=15140

{code}
Apr 03 02:59:16 02:59:16.247 [INFO] 
---
Apr 03 02:59:16 02:59:16.248 [INFO]  T E S T S
Apr 03 02:59:16 02:59:16.248 [INFO] 
---
Apr 03 02:59:17 02:59:17.841 [INFO] Running SqlClientITCase
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Apr 03 03:03:15 Caused by: 
org.apache.flink.connector.testframe.container.ImageBuildException: Failed to 
build image "flink-configured-jobmanager"
Apr 03 03:03:15 at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.build(FlinkImageBuilder.java:234)
Apr 03 03:03:15 at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configureJobManagerContainer(FlinkTestcontainersConfigurator.java:65)
Apr 03 03:03:15 ... 12 more
Apr 03 03:03:15 Caused by: java.lang.RuntimeException: 
com.github.dockerjava.api.exception.DockerClientException: Could not build 
image: Head 
"https://registry-1.docker.io/v2/library/eclipse-temurin/manifests/21-jre-jammy":
 received unexpected HTTP status: 500 Internal Server Error
Apr 03 03:03:15 at 
org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68)
Apr 03 03:03:15 at 
org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43)
Apr 03 03:03:15 at 
org.testcontainers.utility.LazyFuture.get(LazyFuture.java:47)
Apr 03 03:03:15 at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.buildBaseImage(FlinkImageBuilder.java:255)
Apr 03 03:03:15 at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.build(FlinkImageBuilder.java:206)
Apr 03 03:03:15 ... 13 more
Apr 03 03:03:15 Caused by: 
com.github.dockerjava.api.exception.DockerClientException: Could not build 
image: Head 
"https://registry-1.docker.io/v2/library/eclipse-temurin/manifests/21-jre-jammy":
 received unexpected HTTP status: 500 Internal Server Error
Apr 03 03:03:15 at 
com.github.dockerjava.api.command.BuildImageResultCallback.getImageId(BuildImageResultCallback.java:78)
Apr 03 03:03:15 at 
com.github.dockerjava.api.command.BuildImageResultCallback.awaitImageId(BuildImageResultCallback.java:50)
Apr 03 03:03:15 at 
org.testcontainers.images.builder.ImageFromDockerfile.resolve(ImageFromDockerfile.java:159)
Apr 03 03:03:15 at 
org.testcontainers.images.builder.ImageFromDockerfile.resolve(ImageFromDockerfile.java:40)
Apr 03 03:03:15 at 
org.testcontainers.utility.LazyFuture.getResolvedValue(LazyFuture.java:19)
Apr 03 03:03:15 at 
org.testcontainers.utility.LazyFuture.get(LazyFuture.java:41)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
Apr 03 03:03:15 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
Apr 03 03:03:15 at java.base/java.lang.Thread.run(Thread.java:1583)
Apr 03 03:03:15 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35004) SqlGatewayE2ECase could not start container

2024-04-03 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35004:
---

 Summary: SqlGatewayE2ECase could not start container
 Key: FLINK-35004
 URL: https://issues.apache.org/jira/browse/FLINK-35004
 Project: Flink
  Issue Type: Bug
Reporter: Ryan Skraba


1.20, jdk17: 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58708=logs=e8e46ef5-75cc-564f-c2bd-1797c35cbebe=60c49903-2505-5c25-7e46-de91b1737bea=15078

There is an error: "Process failed due to timeout" in 
{{SqlGatewayE2ECase.testSqlClientExecuteStatement}}.  In the maven logs, we can 
see:

{code:java}
02:57:26,979 [main] INFO  tc.prestodb/hdp2.6-hive:10
   [] - Image prestodb/hdp2.6-hive:10 pull took 
PT43.59218S02:57:26,991 [main] INFO  
tc.prestodb/hdp2.6-hive:10   [] - Creating 
container for image: prestodb/hdp2.6-hive:1002:57:27,032 [main] 
INFO  tc.prestodb/hdp2.6-hive:10   [] - 
Container prestodb/hdp2.6-hive:10 is starting: 
162069678c7d03252a42ed81ca43e1911ca7357c476a4a5de294ffe55bd8314502:57:42,846 [  
  main] INFO  tc.prestodb/hdp2.6-hive:10
   [] - Container prestodb/hdp2.6-hive:10 started in 
PT15.855339866S02:57:53,447 [main] ERROR 
tc.prestodb/hdp2.6-hive:10   [] - Could not 
start containerjava.lang.RuntimeException: java.net.SocketTimeoutException: 
timeoutat 
org.apache.flink.table.gateway.containers.HiveContainer.containerIsStarted(HiveContainer.java:94)
 ~[test-classes/:?]at 
org.testcontainers.containers.GenericContainer.containerIsStarted(GenericContainer.java:723)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:543)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:354)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
 ~[duct-tape-1.0.8.jar:?]at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:344)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.apache.flink.table.gateway.containers.HiveContainer.doStart(HiveContainer.java:69)
 ~[test-classes/:?]at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:334) 
~[testcontainers-1.19.1.jar:1.19.1]at 
org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1144)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:28)
 ~[testcontainers-1.19.1.jar:1.19.1]at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) ~[junit-4.13.2.jar:4.13.2]  
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) 
~[junit-4.13.2.jar:4.13.2]at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) 
~[junit-4.13.2.jar:4.13.2]at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137) ~[junit-4.13.2.jar:4.13.2]   
 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) 
~[junit-4.13.2.jar:4.13.2]at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
 ~[junit-vintage-engine-5.10.1.jar:5.10.1]at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
 ~[junit-vintage-engine-5.10.1.jar:5.10.1]at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) 
~[junit-vintage-engine-5.10.1.jar:5.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:198)
 ~[junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:169)
 ~[junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:93)
 ~[junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:58)
 ~[junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:141)
 [junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:57)
 [junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:103)
 [junit-platform-launcher-1.10.1.jar:1.10.1]at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:85)
 [junit-platform-launcher-1.10.1.jar:1.10.1]at 

Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Lincoln Lee
Hi Jeyhun,

Thanks for your quick response!

In streaming scenario, shuffle commonly occurs before the stateful
operator, and there's a sanity check[1] when the stateful operator
accesses the state. This implies the consistency requirement of the
partitioner used for data shuffling and state key selector for state
accessing(see KeyGroupStreamPartitioner for more details),
otherwise, there may be state access errors. That is to say, in the
streaming scenario, it is not only the strict requirement described in
FlinkRelDistribution#requireStrict, but also the implied consistency of
hash calculation.

Also, if this flip targets both streaming and batch scenarios, it is
recommended to do PoC validation for streaming as well.

[1] https://issues.apache.org/jira/browse/FLINK-29430


Best,
Lincoln Lee


Leonard Xu  于2024年4月3日周三 14:25写道:

> Hey, Jeyhun
>
> Thanks for kicking off this discussion. I have two questions about
> streaming sources:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>
>
>
> > 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> >
> > Hi Lincoln,
> >
> > Thanks a lot for your comments. Please find my answers below.
> >
> >
> > 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >
> >
> > - The FLIP targets both streaming and batch scenarios.
> > Could you please elaborate more on what you mean by additional
> > restrictions?
> >
> >
> > 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >
> > BatchPhysicalHashAggregate (local)
> >
> >   +- BatchPhysicalLocalHashAggregate (local)
> >>  +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules)
> >
> >
> > - Yes, you are completely right. Note that the PR you referenced is just
> a
> > quick PoC.
> > Redundant operators you mentioned exist because
> > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > without modifying upstream/downstream operators.
> > As I mentioned, the implementation is just a PoC and the end
> implementation
> > will make sure that existing redundancy elimination rules remove
> redundant
> > operators.
> >
> >
> > Also, in the draft pr,
> >> the optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> >
> >
> > -  Note that in this example, Exchange operator have a
> > property KEEP_INPUT_AS_IS that indicates that data distribution is the
> same
> > as its input.
> > Since we have redundant operators (as shown above, two aggregate
> operators)
> > one of the rules (not in this FLIP)
> > adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> > Similar to my comment above, the end implementation will be except from
> > redundant operators.
> >
> > In conjunction with question 2, I am wondering if we have a better choice
> >> (of 

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-03 Thread lorenzo . affetti
Hello everybody!
Thanks for the FLIP as it looks amazing (and I think the prove is this deep 
discussion it is provoking :))

I have a couple of comments to add to this:

Even though I get the reason why you rejected MATERIALIZED VIEW, I still like 
it a lot, and I would like to provide pointers on how the materialized view 
concept twisted in last years:

• Materialize DB (https://materialize.com/)
• The famous talk by Martin Kleppmann "turning the database inside out" 
(https://www.youtube.com/watch?v=fU9hR3kiOK0)

I think the 2 above twisted the materialized view concept to more than just an 
optimization for accessing pre-computed aggregates/filters.
I think that concept (at least in my mind) is now adherent to the semantics of 
the words themselves ("materialized" and "view") than on its implementations in 
DBMs, as just a view on raw data that, hopefully, is constantly updated with 
fresh results.
That's why I understand Timo's et al. objections.
Still I understand there is no need to add confusion :)

Still, I don't understand why we need another type of special table.
Could you dive deep into the reasons why not simply adding the FRESHNESS 
parameter to standard tables?

I would say that as a very seamless implementation with the goal of a 
unification of batch and streaming.
If we stick to a unified world, I think that Flink should just provide 1 type 
of table that is inherently dynamic.
Now, depending on FRESHNESS objectives / connectors used in WITH, that table 
can be backed by a stream or batch job as you explained in your FLIP.

Maybe I am totally missing the point :)

Thank you in advance,
Lorenzo
On Apr 3, 2024 at 15:25 +0200, Martijn Visser , wrote:
> Hi all,
>
> Thanks for the proposal. While the FLIP talks extensively on how Snowflake
> has Dynamic Tables and Databricks has Delta Live Tables, my understanding
> is that Databricks has CREATE STREAMING TABLE [1] which relates with this
> proposal.
>
> I do have concerns about using CREATE DYNAMIC TABLE, specifically about
> confusing the users who are familiar with Snowflake's approach where you
> can't change the content via DML statements, while that is something that
> would work in this proposal. Naming is hard of course, but I would probably
> prefer something like CREATE CONTINUOUS TABLE, CREATE REFRESH TABLE or
> CREATE LIVE TABLE.
>
> Best regards,
>
> Martijn
>
> [1]
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table.html
>
> On Wed, Apr 3, 2024 at 5:19 AM Ron liu  wrote:
>
> > Hi, dev
> >
> > After offline discussion with Becket Qin, Lincoln Lee and Jark Wu, we have
> > improved some parts of the FLIP.
> >
> > 1. Add Full Refresh Mode section to clarify the semantics of full refresh
> > mode.
> > 2. Add Future Improvement section explaining why query statement does not
> > support references to temporary view and possible solutions.
> > 3. The Future Improvement section explains a possible future solution for
> > dynamic table to support the modification of query statements to meet the
> > common field-level schema evolution requirements of the lakehouse.
> > 4. The Refresh section emphasizes that the Refresh command and the
> > background refresh job can be executed in parallel, with no restrictions at
> > the framework level.
> > 5. Convert RefreshHandler into a plug-in interface to support various
> > workflow schedulers.
> >
> > Best,
> > Ron
> >
> > Ron liu  于2024年4月2日周二 10:28写道:
> >
> > > > Hi, Venkata krishnan
> > > >
> > > > Thank you for your involvement and suggestions, and hope that the design
> > > > goals of this FLIP will be helpful to your business.
> > > >
> > > > > > > >>> 1. In the proposed FLIP, given the example for the dynamic 
> > > > > > > >>> table, do
> > > > the
> > > > data sources always come from a single lake storage such as Paimon or
> > does
> > > > the same proposal solve for 2 disparate storage systems like Kafka and
> > > > Iceberg where Kafka events are ETLed to Iceberg similar to Paimon?
> > > > Basically the lambda architecture that is mentioned in the FLIP as well.
> > > > I'm wondering if it is possible to switch b/w sources based on the
> > > > execution mode, for eg: if it is backfill operation, switch to a data
> > lake
> > > > storage system like Iceberg, otherwise an event streaming system like
> > > > Kafka.
> > > >
> > > > Dynamic table is a design abstraction at the framework level and is not
> > > > tied to the physical implementation of the connector. If a connector
> > > > supports a combination of Kafka and lake storage, this works fine.
> > > >
> > > > > > > >>> 2. What happens in the context of a bootstrap (batch) + 
> > > > > > > >>> nearline
> > update
> > > > (streaming) case that are stateful applications? What I mean by that is,
> > > > will the state from the batch application be transferred to the nearline
> > > > application after the bootstrap execution is complete?
> > > >
> > > > I think this is another orthogonal thing, 

[jira] [Created] (FLINK-35003) Update zookeeper to 3.8.4 to address CVE-2024-23944

2024-04-03 Thread Shilun Fan (Jira)
Shilun Fan created FLINK-35003:
--

 Summary: Update zookeeper to 3.8.4 to address CVE-2024-23944
 Key: FLINK-35003
 URL: https://issues.apache.org/jira/browse/FLINK-35003
 Project: Flink
  Issue Type: Improvement
  Components: BuildSystem / Shaded
Reporter: Shilun Fan


Update zookeeper to 3.8.4 to address CVE-2024-23944

https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper/3.8.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Parallelism is not working as expected for Apache Beam Code Running on a Flink Kubernetes Cluster

2024-04-03 Thread Dipak Tandel
Hi Everyone

I have deployed a Flink cluster using a Flink Kubernetes operator and then
submitted an Apache Beam Pipeline using a FlinkRunner.

I submitted two jobs. One with *parallelism=20* and another with
*parallelism=1* but both jobs took almost the same time to complete the
task (A difference of a few seconds), which is very surprising.  In Flink
UI, I can see the parallelism is set to 20 and each task has 20 sub-tasks
but 19 subtasks are finishing the execution in a few seconds to minutes and
only one subtask is running for the majority of the time.

I have attached a screenshot below, where one subtask took nearly 1 hour 14
minutes, and the remaining 19 subtasks took less than 2 minutes to
complete. So I am not getting any benefit of parallelism here. The task is
simple and does not rely on any state still it's not using the resources to
parallelize the work. * Is there any way to force parallelism here?  *

[image: image.png]

The task has multiple steps and the final step is to write the output to
the bucket, the output is written to multiple files so the task can be
parallelized but only one subtask is doing the actual job.


[image: image.png]


Can someone help me figure out the right configuration and setup needed to
parallelize the work?

Regards
Dipak


Participate in the ASF 25th Anniversary Campaign

2024-04-03 Thread Brian Proffitt
Hi everyone,

As part of The ASF’s 25th anniversary campaign[1], we will be celebrating
projects and communities in multiple ways.

We invite all projects and contributors to participate in the following
ways:

* Individuals - submit your first contribution:
https://news.apache.org/foundation/entry/the-asf-launches-firstasfcontribution-campaign
* Projects - share your public good story:
https://docs.google.com/forms/d/1vuN-tUnBwpTgOE5xj3Z5AG1hsOoDNLBmGIqQHwQT6k8/viewform?edit_requested=true
* Projects - submit a project spotlight for the blog:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278466116
* Projects - contact the Voice of Apache podcast (formerly Feathercast) to
be featured: https://feathercast.apache.org/help/
*  Projects - use the 25th anniversary template and the #ASF25Years hashtag
on social media:
https://docs.google.com/presentation/d/1oDbMol3F_XQuCmttPYxBIOIjRuRBksUjDApjd8Ve3L8/edit#slide=id.g26b0919956e_0_13

If you have questions, email the Marketing & Publicity team at
mark...@apache.org.

Peace,
BKP

[1] https://apache.org/asf25years/

[NOTE: You are receiving this message because you are a contributor to an
Apache Software Foundation project. The ASF will very occasionally send out
messages relating to the Foundation to contributors and members, such as
this one.]

Brian Proffitt
VP, Marketing & Publicity
VP, Conferences


[jira] [Created] (FLINK-35002) GitHub action/upload-artifact@v4 can timeout

2024-04-03 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35002:
---

 Summary: GitHub action/upload-artifact@v4 can timeout
 Key: FLINK-35002
 URL: https://issues.apache.org/jira/browse/FLINK-35002
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Ryan Skraba


A timeout can occur when uploading a successfully built artifact:
 * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]

{code:java}
2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
uploaded
2024-04-02T02:20:15.6360133Z Artifact name is valid!
2024-04-02T02:20:15.6362872Z Root directory input is valid!
2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. Retrying 
request in 3000 ms...
2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. Retrying 
request in 4785 ms...
2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. Retrying 
request in 7375 ms...
2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. Retrying 
request in 14988 ms...
2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to make 
request after 5 attempts: Request timeout: 
/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
2024-04-02T02:22:59.9893296Z Post job cleanup.
2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
(This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-04-03 Thread lorenzo . affetti
Congratulations!
Big milestone reached :)

Best,
Lorenzo
On Apr 2, 2024 at 03:50 +0200, Ron liu , wrote:
> Congratulations!
>
> Best,
> Ron
>
> Jeyhun Karimov  于2024年4月1日周一 18:12写道:
>
> > Congratulations!
> >
> > Regards,
> > Jeyhun
> >
> > On Mon, Apr 1, 2024 at 7:43 AM Guowei Ma  wrote:
> >
> > > > Congratulations!
> > > > Best,
> > > > Guowei
> > > >
> > > >
> > > > On Mon, Apr 1, 2024 at 11:15 AM Feng Jin  wrote:
> > > >
> > > > > > Congratulations!
> > > > > >
> > > > > > Best,
> > > > > > Feng Jin
> > > > > >
> > > > > > On Mon, Apr 1, 2024 at 10:51 AM weijie guo 
> > > > > > 
> > > > > > wrote:
> > > > > >
> > > > > > >> Congratulations!
> > > > > > >>
> > > > > > >> Best regards,
> > > > > > >>
> > > > > > >> Weijie
> > > > > > >>
> > > > > > >>
> > > > > > >> Hang Ruan  于2024年4月1日周一 09:49写道:
> > > > > > >>
> > > > > > > >> > Congratulations!
> > > > > > > >> >
> > > > > > > >> > Best,
> > > > > > > >> > Hang
> > > > > > > >> >
> > > > > > > >> > Lincoln Lee  于2024年3月31日周日 00:10写道:
> > > > > > > >> >
> > > > > > > > >> > > Congratulations!
> > > > > > > > >> > >
> > > > > > > > >> > > Best,
> > > > > > > > >> > > Lincoln Lee
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > Jark Wu  于2024年3月30日周六 22:13写道:
> > > > > > > > >> > >
> > > > > > > > > >> > > > Congratulations!
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Best,
> > > > > > > > > >> > > > Jark
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Fri, 29 Mar 2024 at 12:08, Yun Tang 
> > > > > > > > > >> > > > 
> > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > > >> > > > > Congratulations to all Paimon guys!
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > Glad to see a Flink sub-project has been 
> > > > > > > > > > >> > > > > graduated to an
> > Apache
> > > > > > > > >> > > top-level
> > > > > > > > > > >> > > > > project.
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > Best
> > > > > > > > > > >> > > > > Yun Tang
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > 
> > > > > > > > > > >> > > > > From: Hangxiang Yu 
> > > > > > > > > > >> > > > > Sent: Friday, March 29, 2024 10:32
> > > > > > > > > > >> > > > > To: dev@flink.apache.org 
> > > > > > > > > > >> > > > > Subject: Re: Re: [ANNOUNCE] Apache Paimon is 
> > > > > > > > > > >> > > > > graduated to Top
> > > > > > >> Level
> > > > > > > > > >> > > > Project
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > Congratulations!
> > > > > > > > > > >> > > > >
> > > > > > > > > > >> > > > > On Fri, Mar 29, 2024 at 10:27 AM Benchao Li <
> > > > libenc...@apache.org
> > > > > > > >> >
> > > > > > > > > >> > > > wrote:
> > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > Congratulations!
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > Zakelly Lan  
> > > > > > > > > > > >> > > > > > 于2024年3月29日周五 10:25写道:
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Congratulations!
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > Best,
> > > > > > > > > > > > >> > > > > > > Zakelly
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > >> > > > > > > On Thu, Mar 28, 2024 at 10:13 PM Jing Ge
> > > > > > > > >> > >  > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Congrats!
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > Best regards,
> > > > > > > > > > > > > >> > > > > > > > Jing
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > >> > > > > > > > On Thu, Mar 28, 2024 at 1:27 PM 
> > > > > > > > > > > > > >> > > > > > > > Feifan Wang <
> > > > > > > >> > zoltar9...@163.com>
> > > > > > > > > > > >> > > > > > wrote:
> > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > Congratulations!——
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > Best regards,
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > Feifan Wang
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > >
> > > > > > > > > > > > > > >> > > > > > > > > At 2024-03-28 20:02:43, "Yanfei 
> > > > > > > > > > > > > > >> > > > > > > > > Lei" <
> > > > fredia...@gmail.com
> > > > > > > >> >
> > > > > > > > > >> > > > wrote:
> > > > > > > > > > > > > > > >> > > > > > > > > >Congratulations!
> > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > >> > > > > > > > > >Best,
> > > > > > > > > > > > > > > >> > > > > > > > > >Yanfei
> > > > > > > > > > > > > > > >> > > > > > > > > >
> > > > > > > > > > > > > > > >> > > > > > > > > >Zhanghao Chen 
> > > > > > > 

Re: [DISCUSS] FLIP-435: Introduce a New Dynamic Table for Simplifying Data Pipelines

2024-04-03 Thread Martijn Visser
Hi all,

Thanks for the proposal. While the FLIP talks extensively on how Snowflake
has Dynamic Tables and Databricks has Delta Live Tables, my understanding
is that Databricks has CREATE STREAMING TABLE [1] which relates with this
proposal.

I do have concerns about using CREATE DYNAMIC TABLE, specifically about
confusing the users who are familiar with Snowflake's approach where you
can't change the content via DML statements, while that is something that
would work in this proposal. Naming is hard of course, but I would probably
prefer something like CREATE CONTINUOUS TABLE, CREATE REFRESH TABLE or
CREATE LIVE TABLE.

Best regards,

Martijn

[1]
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table.html

On Wed, Apr 3, 2024 at 5:19 AM Ron liu  wrote:

> Hi, dev
>
> After offline discussion with Becket Qin, Lincoln Lee and Jark Wu,  we have
> improved some parts of the FLIP.
>
> 1. Add Full Refresh Mode section to clarify the semantics of full refresh
> mode.
> 2. Add Future Improvement section explaining why query statement does not
> support references to temporary view and possible solutions.
> 3. The Future Improvement section explains a possible future solution for
> dynamic table to support the modification of query statements to meet the
> common field-level schema evolution requirements of the lakehouse.
> 4. The Refresh section emphasizes that the Refresh command and the
> background refresh job can be executed in parallel, with no restrictions at
> the framework level.
> 5. Convert RefreshHandler into a plug-in interface to support various
> workflow schedulers.
>
> Best,
> Ron
>
> Ron liu  于2024年4月2日周二 10:28写道:
>
> > Hi, Venkata krishnan
> >
> > Thank you for your involvement and suggestions, and hope that the design
> > goals of this FLIP will be helpful to your business.
> >
> > >>> 1. In the proposed FLIP, given the example for the dynamic table, do
> > the
> > data sources always come from a single lake storage such as Paimon or
> does
> > the same proposal solve for 2 disparate storage systems like Kafka and
> > Iceberg where Kafka events are ETLed to Iceberg similar to Paimon?
> > Basically the lambda architecture that is mentioned in the FLIP as well.
> > I'm wondering if it is possible to switch b/w sources based on the
> > execution mode, for eg: if it is backfill operation, switch to a data
> lake
> > storage system like Iceberg, otherwise an event streaming system like
> > Kafka.
> >
> > Dynamic table is a design abstraction at the framework level and is not
> > tied to the physical implementation of the connector. If a connector
> > supports a combination of Kafka and lake storage, this works fine.
> >
> > >>> 2. What happens in the context of a bootstrap (batch) + nearline
> update
> > (streaming) case that are stateful applications? What I mean by that is,
> > will the state from the batch application be transferred to the nearline
> > application after the bootstrap execution is complete?
> >
> > I think this is another orthogonal thing, something that FLIP-327 tries
> to
> > address, not directly related to Dynamic Table.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data
> >
> > Best,
> > Ron
> >
> > Venkatakrishnan Sowrirajan  于2024年3月30日周六 07:06写道:
> >
> >> Ron and Lincoln,
> >>
> >> Great proposal and interesting discussion for adding support for dynamic
> >> tables within Flink.
> >>
> >> At LinkedIn, we are also trying to solve compute/storage convergence for
> >> similar problems discussed as part of this FLIP, specifically periodic
> >> backfill, bootstrap + nearline update use cases using single
> >> implementation
> >> of business logic (single script).
> >>
> >> Few clarifying questions:
> >>
> >> 1. In the proposed FLIP, given the example for the dynamic table, do the
> >> data sources always come from a single lake storage such as Paimon or
> does
> >> the same proposal solve for 2 disparate storage systems like Kafka and
> >> Iceberg where Kafka events are ETLed to Iceberg similar to Paimon?
> >> Basically the lambda architecture that is mentioned in the FLIP as well.
> >> I'm wondering if it is possible to switch b/w sources based on the
> >> execution mode, for eg: if it is backfill operation, switch to a data
> lake
> >> storage system like Iceberg, otherwise an event streaming system like
> >> Kafka.
> >> 2. What happens in the context of a bootstrap (batch) + nearline update
> >> (streaming) case that are stateful applications? What I mean by that is,
> >> will the state from the batch application be transferred to the nearline
> >> application after the bootstrap execution is complete?
> >>
> >> Regards
> >> Venkata krishnan
> >>
> >>
> >> On Mon, Mar 25, 2024 at 8:03 PM Ron liu  wrote:
> >>
> >> > Hi, Timo
> >> >
> >> > Thanks for your quick response, and your suggestion.
> >> >
> >> > Yes, this 

Re: [DISCUSS] Externalized Google Cloud Connectors

2024-04-03 Thread lorenzo . affetti
@Leonard @Martijn
Following up on @Claire question, what is the role of Bahir 
(https://bahir.apache.org/) in this scenario?

I am also trying to understand how connectors fir in the Flink project scenario 
:)

Thank you,
Lorenzo
On Apr 2, 2024 at 06:13 +0200, Leonard Xu , wrote:
> Hey, Claire
>
> Thanks starting this discussion, all flink external connector repos are 
> sub-projects of Apache Flink, including 
> https://github.com/apache/flink-connector-aws.
>
> Creating a flink external connector repo named flink-connectors-gcp as 
> sub-project of Apache Beam is not a good idea from my side.
>
> > Currently, we have no Flink committers on our team. We are actively
> > involved in the Apache Beam community and have a number of ASF members on
> > the team.
>
> Not having Flink committer should not be a strong reason in this case, Flink 
> community welcome contributors to contribute and maintain the connectors, as 
> a contributor, through continuous connector development and maintenance work 
> in the community, you will also have the opportunity to become a Committer.
>
> Best,
> Leonard
>
>
> > 2024年2月14日 上午12:24,Claire McCarthy  写道:
> >
> > Hi Devs!
> >
> > I’d like to kick off a discussion on setting up a repo for a new fleet of
> > Google Cloud connectors.
> >
> > A bit of context:
> >
> > -
> >
> > We have a team of Google engineers who are looking to build/maintain
> > 5-10 GCP connectors for Flink.
> > -
> >
> > We are wondering if it would make sense to host our connectors under the
> > ASF umbrella following a similar repo structure as AWS (
> > https://github.com/apache/flink-connector-aws). In our case:
> > apache/flink-connectors-gcp.
> > -
> >
> > Currently, we have no Flink committers on our team. We are actively
> > involved in the Apache Beam community and have a number of ASF members on
> > the team.
> >
> >
> > We saw that one of the original motivations for externalizing connectors
> > was to encourage more activity and contributions around connectors by
> > easing the contribution overhead. We understand that the decision was
> > ultimately made to host the externalized connector repos under the ASF
> > organization. For the same reasons (release infra, quality assurance,
> > integration with the community, etc.), we would like all GCP connectors to
> > live under the ASF organization.
> >
> > We want to ask the Flink community what you all think of this idea, and
> > what would be the best way for us to go about contributing something like
> > this. We are excited to contribute and want to learn and follow your
> > practices.
> >
> > A specific issue we know of is that our changes need approval from Flink
> > committers. Do you have a suggestion for how best to go about a new
> > contribution like ours from a team that does not have committers? Is it
> > possible, for example, to partner with a committer (or a small cohort) for
> > tight engagement? We also know about ASF voting and release process, but
> > that doesn't seem to be as much of a potential hurdle.
> >
> > Huge thanks in advance for sharing your thoughts!
> >
> >
> > Claire
>


Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread David Morávek
+1 (binding)

My only suggestion would be to move Catalog changes into a separate
interface to allow us to begin with lower stability guarantees. Existing
Catalogs would be able to opt-in by implementing it. It's a minor thing
though, overall the FLIP is solid and the direction is pretty exciting.

Best,
D.

On Wed, Apr 3, 2024 at 2:31 AM David Radley  wrote:

> Hi Hao,
> I don’t think this counts as an objection, I have some comments. I should
> have put this on the discussion thread earlier but have just got to this.
> - I suggest we can put a model version in the model resource. Versions are
> notoriously difficult to add later; I don’t think we want to proliferate
> differently named models as a model mutates. We may want to work with
> non-latest models.
> - I see that the model name is the unique identifier. I realise this would
> move away from the Oracle syntax – so may not be feasible short term; but I
> wonder if we can have:
>  - a uuid as the main identifier and the model name as an attribute.
> or
>  - a namespace (or something like a system of origin)
> to help organise models with the same name.
> - does the model have an owner? I assume that Flink model resource is the
> master of the model? I imagine in the future that a model that comes in via
> a new connector could be kept up to date with the external model and would
> not be allowed to be changed by anything other than the connector.
>
>Kind regards, David.
>
> From: Hao Li 
> Date: Friday, 29 March 2024 at 16:30
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] [VOTE] FLIP-437: Support ML Models in Flink SQL
> Hi devs,
>
> I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> SQL [1]. The discussion thread is here [2].
>
> The vote will be open for at least 72 hours unless there is an objection or
> insufficient votes.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
>
> [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
>
> Thanks,
> Hao
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU
>


[jira] [Created] (FLINK-35001) Avoid scientific notation for DOUBLE to STRING

2024-04-03 Thread Timo Walther (Jira)
Timo Walther created FLINK-35001:


 Summary: Avoid scientific notation for DOUBLE to STRING
 Key: FLINK-35001
 URL: https://issues.apache.org/jira/browse/FLINK-35001
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Timo Walther


Flink currently uses Java semantics for some casts.

When executing:

{code}
SELECT CAST(CAST('19586232024.0' AS DOUBLE) AS STRING);
{code}

Leads to
{code}
1.9586232024E10
{code}

However, other vendors such as Postgres or MySQL return {{19586232024}}.

We should reconsider this behavior for consistency.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35000) PullRequest template doesn't use the correct format to refer to the testing code convention

2024-04-03 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-35000:
-

 Summary: PullRequest template doesn't use the correct format to 
refer to the testing code convention
 Key: FLINK-35000
 URL: https://issues.apache.org/jira/browse/FLINK-35000
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI, Project Website
Affects Versions: 1.18.1, 1.19.0, 1.20.0
Reporter: Matthias Pohl


The PR template refers to 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
 rather than 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-438: Make Flink's Hadoop and YARN configuration probing consistent

2024-04-03 Thread Ferenc Csaky
Hi Venkata,

Thank you for opening the discussion about this!

After taking a look at the YARN and Hadoop configurations, the
reason why it was implemented this way is that, in case of YARN,
every YARN-specific property is prefixed with "yarn.", so to get
the final, YARN-side property it is enough to remove the "flink."
prefix.

In case of Hadoop, there are properties that not prefixed with
"hadoop.", e.g. "dfs.replication" so to identify and get the
Hadoop-side property it is necessary to duplicate the "hadoop" part
in the properties.

Taking this into consideration I would personally say -0 to this
change. IMO the current behavior can be justified as giving
slightly different solutions to slightly different problems, which
are well documented. Handling both prefixes would complicate the
parsing logic until the APIs can be removed, which as it looks at
the moment would only be possible in Flink 3.0, which probably will
not happen in the foreseeable future, so I do not see the benefit
of the added complexity.

Regarding the FLIP, in the "YARN configuration override example"
part, I think you should present an example that works correctly
at the moment: "flink.yarn.application.classpath" ->
"yarn.application.classpath".

Best,
Ferenc


On Friday, March 29th, 2024 at 23:45, Venkatakrishnan Sowrirajan 
 wrote:

> 
> 
> Hi Flink devs,
> 
> I would like to start a discussion on FLIP-XXX: Make Flink's Hadoop and
> YARN configuration probing consistent
> https://docs.google.com/document/d/1I2jBFI0eVkofAVCAEeajNQRfOqKGJsRfZd54h79AIYc/edit?usp=sharing.
> 
> This stems from an earlier discussion thread here
> https://lists.apache.org/thread/l2fh5shbf59fjgbt1h73pmmsqj038ppv.
> 
> 
> This FLIP is proposing to make the configuration probing behavior between
> Hadoop and YARN configuration to be consistent.
> 
> Regards
> Venkata krishnan


[jira] [Created] (FLINK-34999) PR CI stopped operating

2024-04-03 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34999:
-

 Summary: PR CI stopped operating
 Key: FLINK-34999
 URL: https://issues.apache.org/jira/browse/FLINK-34999
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.18.1, 1.19.0, 1.20.0
Reporter: Matthias Pohl


There are no [new PR CI 
runs|https://dev.azure.com/apache-flink/apache-flink/_build?definitionId=2] 
being picked up anymore. [Recently updated 
PRs|https://github.com/apache/flink/pulls?q=sort%3Aupdated-desc] are not picked 
up by the @flinkbot.

In the meantime there was a notification sent from GitHub that the password of 
the @flinkbot was reset for security reasons. It's quite likely that these two 
events are related.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-XXX: Introduce Flink SQL variables

2024-04-03 Thread Ferenc Csaky
Hi Jeyhun,

Thank you for your questions, please see my answers below.

> What is its impact on query optimization because resolving
> variables at the parsing stage might affect query optimization.

The approach I mentioned in the FLIP would not affect query
optimization, as it restricts variables to be literals, hence do
not support calculated variables. This means that the substitution
would be a simple string replace for the variables before the
actual parse happens on the already resolved statement.

Although this may not be the way to go, according to Yanfei's
previous comments I started to check on possible solutions for
calculated variables, which will probably change this answer, but
I will report back when I have something regarding this topic.

> What is the scope of variables? I mean when and how they override
> each other and when get out of their scopes?

This is a good question, I did not mention this in the FLIP. My
thinking on this topic is that a VariableStore is tied to a SQL
session, so variables are session scoped.

Having a system-wide scope might make sense. In that case, the
system-wide variable should be shadowed by the same session-wide
variable IMO, as a general rule regarding variable shadowing [1].
Although I did not include system-wide scope in my PoC, but this
would basically mean to maintain a specific system-wide
VariableStore.

> Does the proposal support dynamic assignment of the variables or
> the value of variables should be known at query compile time?

Covered this in my answer to the first Q.

> Can we somehow benefit from/leverage Calcite's parameterization
> feature in this proposal?

I am not super familiar with Calcite capabilities regarding this
topic and the Calcite docs were not really helpful either. But I
might looked over something, so can you elaborate more / point me
towards what you mean?


Best,
Ferenc

[1] https://en.wikipedia.org/wiki/Variable_shadowing


On Monday, April 1st, 2024 at 15:24, Jeyhun Karimov  
wrote:

> 
> 
> Hi Ferenc,
> 
> Thanks for the proposal. Sounds like a good idea!
> I have a few questions on that:
> 
> - What is its impact on query optimization because resolving variables at
> the parsing stage might affect query optimization.
> 
> - What is the scope of variables? I mean when and how they override each
> other and when get out of their scopes?
> 
> - Does the proposal support dynamic assignment of the variables or the
> value of variables should be known at query compile time?
> 
> - Can we somehow benefit from/leverage Calcite's parameterization feature
> in this proposal?
> 
> Regards,
> Jeyhun
> 
> On Thu, Mar 28, 2024 at 6:21 PM Ferenc Csaky ferenc.cs...@pm.me.invalid
> 
> wrote:
> 
> > Hi, Jim, Yanfei,
> > 
> > Thanks for your comments! Let me reply in the order of the
> > messages.
> > 
> > > I'd prefer sticking to the SQL standard if possible. Would
> > > it be possible / sensible to allow for each syntax, perhaps
> > > managed by a config setting?
> > 
> > Correct me if I am wrong, but AFAIK variables are not part of
> > the ANSI SQL standard. The '@' prefix is used by some widely
> > used DB mgr, e.g. MySQL.
> > 
> > Regarding having multiple resolution syntax, it would be possible,
> > if we agree it adds value. Personally I do not have a strong
> > opinion on that.
> > 
> > > I'm new to Flink SQL and I'm curious if these variables can be
> > > calculated from statements or expression [1]?
> > 
> > Good point! The proposed solution would lack this functionality.
> > On our platform, we have a working solution of this that was
> > sufficient to solve the main problem we had to carry SQL between
> > environments without change.
> > 
> > At this point, variable values can only be literals, and they are
> > automatically escaped during resolution. Except if they are
> > resolved as a DDL statement property value.
> > 
> > But if the community agrees that it would be useful to have the
> > ability of calculated variables I would happily spend some time
> > on possible solutions that makes sense in Flink.
> > 
> > WDYT?
> > 
> > Best,
> > Ferenc
> > 
> > On Thursday, March 28th, 2024 at 03:58, Yanfei Lei fredia...@gmail.com
> > wrote:
> > 
> > > Hi Ferenc,
> > > 
> > > Thanks for the proposal, using SQLvariables to exclude
> > > environment-specific configuration from code sounds like a good idea.
> > > 
> > > I'm new to Flink SQL and I'm curious if these variables can be
> > > calculated from statements or expression [1]? In FLIP, it seems that
> > > the values are in the form of StringLiteral.
> > > 
> > > [1]
> > > https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-set-variable.html
> > > 
> > > Jim Hughes jhug...@confluent.io.invalid 于2024年3月28日周四 04:54写道:
> > > 
> > > > Hi Ferenc,
> > > > 
> > > > Looks like a good idea.
> > > > 
> > > > I'd prefer sticking to the SQL standard if possible. Would it be
> > > > possible
> > > > / sensible to allow for each syntax, perhaps managed by a config
> > > > 

[jira] [Created] (FLINK-34998) Wordcount on Docker test failed on azure

2024-04-03 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-34998:
--

 Summary: Wordcount on Docker test failed on azure
 Key: FLINK-34998
 URL: https://issues.apache.org/jira/browse/FLINK-34998
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo


/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh:
 line 65: docker-compose: command not found
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh:
 line 66: docker-compose: command not found
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_docker_embedded_job.sh:
 line 67: docker-compose: command not found
sort: cannot read: 
'/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-24250435151/out/docker_wc_out*':
 No such file or directory
Apr 03 02:08:14 FAIL WordCount: Output hash mismatch.  Got 
d41d8cd98f00b204e9800998ecf8427e, expected 0e5bd0a3dd7d5a7110aa85ff70adb54b.
Apr 03 02:08:14 head hexdump of actual:
head: cannot open 
'/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-24250435151/out/docker_wc_out*'
 for reading: No such file or directory
Apr 03 02:08:14 Stopping job timeout watchdog (with pid=244913)
Apr 03 02:08:14 [FAIL] Test script contains errors.


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58709=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=5d91035e-8022-55f2-2d4f-ab121508bf7e=6043



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34997) PyFlink YARN per-job on Docker test failed on azure

2024-04-03 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-34997:
--

 Summary: PyFlink YARN per-job on Docker test failed on azure
 Key: FLINK-34997
 URL: https://issues.apache.org/jira/browse/FLINK-34997
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo


Apr 03 03:12:37 
==
Apr 03 03:12:37 Running 'PyFlink YARN per-job on Docker test'
Apr 03 03:12:37 
==
Apr 03 03:12:37 TEST_DATA_DIR: 
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-37046085202
Apr 03 03:12:37 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT
Apr 03 03:12:38 Flink dist directory: 
/home/vsts/work/1/s/flink-dist/target/flink-1.19-SNAPSHOT-bin/flink-1.19-SNAPSHOT
Apr 03 03:12:38 Docker version 24.0.9, build 2936816
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common_docker.sh: line 
24: docker-compose: command not found
Apr 03 03:12:38 [FAIL] Test script contains errors.
Apr 03 03:12:38 Checking of logs skipped.
Apr 03 03:12:38 
Apr 03 03:12:38 [FAIL] 'PyFlink YARN per-job on Docker test' failed after 0 
minutes and 1 seconds! Test exited with exit code 1




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34996) Deserializer can't be instantiated when connector-kafka installed into Flink Libs

2024-04-03 Thread Hugo Gu (Jira)
Hugo Gu created FLINK-34996:
---

 Summary: Deserializer can't be instantiated when connector-kafka 
installed into Flink Libs
 Key: FLINK-34996
 URL: https://issues.apache.org/jira/browse/FLINK-34996
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Hugo Gu
 Attachments: image-2024-04-03-17-34-00-120.png, 
image-2024-04-03-17-37-55-105.png

The current implementation of the 
KafkaValueOnlyDeserializerWrapper Class instantiates Deserializer from the 
ClassLoader of the KafkaValueOnlyDeserializerWrapper itself as following figure 
shows.
 
!image-2024-04-03-17-34-00-120.png|width=799,height=293!
 
In case of both following conditions are met:
1. The connector-kafka get installed into Libs of Flink (rather than in the 
User Jar)
2. The user jar defines a customized Deserializer for Kafka Record. 
 
The instantiation of the custom deserializer will fail due to NoClassFound 
exception because it is indeed not available in the system class loader. 
 
As following figure illustrates
 
!image-2024-04-03-17-37-55-105.png|width=413,height=452!
 
It can be fixed by using either UserCodeClassLoader or the ClassLoader of 
current Thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Inquiry Regarding Azure Pipelines

2024-04-03 Thread Yisha Zhou
Hi devs,

I hope this email finds you well. I am writing to seek clarification regarding 
the status of Azure Pipelines within the Apache community and seek assistance 
with a specific issue I encountered.

Today, I made some new commits to a pull request in one of the Apache 
repositories. However, I noticed that even after approximately six hours, there 
were no triggers initiated for the Azure Pipeline. I have a couple of questions 
regarding this matter:

1. Is the Apache community still utilizing Azure Pipelines for CI/CD purposes? 
I came across an issue discussing the migration from Azure to GitHub Actions, 
but I am uncertain about the timeline for discontinuing the use of Azure 
Pipelines.

2. If Azure Pipelines are still in use, where can I find information about the 
position of my commits in the CI queue, awaiting execution?

I would greatly appreciate any insights or guidance you can provide regarding 
these questions. Thank you for your time and attention.

My PR link is https://github.com/apache/flink/pull/24567 
. 

Best regards,
Yisha

Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread David Radley
Hi Hao,
I don’t think this counts as an objection, I have some comments. I should have 
put this on the discussion thread earlier but have just got to this.
- I suggest we can put a model version in the model resource. Versions are 
notoriously difficult to add later; I don’t think we want to proliferate 
differently named models as a model mutates. We may want to work with 
non-latest models.
- I see that the model name is the unique identifier. I realise this would move 
away from the Oracle syntax – so may not be feasible short term; but I wonder 
if we can have:
 - a uuid as the main identifier and the model name as an attribute.
or
 - a namespace (or something like a system of origin)
to help organise models with the same name.
- does the model have an owner? I assume that Flink model resource is the 
master of the model? I imagine in the future that a model that comes in via a 
new connector could be kept up to date with the external model and would not be 
allowed to be changed by anything other than the connector.

   Kind regards, David.

From: Hao Li 
Date: Friday, 29 March 2024 at 16:30
To: dev@flink.apache.org 
Subject: [EXTERNAL] [VOTE] FLIP-437: Support ML Models in Flink SQL
Hi devs,

I'd like to start a vote on the FLIP-437: Support ML Models in Flink
SQL [1]. The discussion thread is here [2].

The vote will be open for at least 72 hours unless there is an objection or
insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL

[2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn

Thanks,
Hao

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


[jira] [Created] (FLINK-34995) flink kafka connector source stuck when partition leader invalid

2024-04-03 Thread yansuopeng (Jira)
yansuopeng created FLINK-34995:
--

 Summary: flink kafka connector source stuck when partition leader 
invalid
 Key: FLINK-34995
 URL: https://issues.apache.org/jira/browse/FLINK-34995
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1, 1.19.0, 1.17.0
Reporter: yansuopeng


when partition leader invalid(leader=-1),  the flink streaming job using 
KafkaSource can't restart or start a new instance with a new groupid,  it will 
stuck and got following exception:

"{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition aaa-1 could be determined{*}"

when leader=-1,  kafka api like KafkaConsumer.position() will block until 
either the position could be determined or an unrecoverable error is 
encountered 

infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline 
together will trigger the problem, especially when the cluster size is 
relatively large.    it rely on kafka administrator to fix in time,  but it 
take risk when in kafka cluster peak period.

 
 
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Martijn Visser
+1 (binding)

On Wed, Apr 3, 2024 at 9:52 AM Leonard Xu  wrote:

> +1(binding)
>
> Best,
> Leonard
>
> > 2024年4月3日 下午3:37,Piotr Nowojski  写道:
> >
> > +1 (binding)
> >
> > Best,
> > Piotrek
> >
> > śr., 3 kwi 2024 o 04:29 Yu Chen  napisał(a):
> >
> >> +1 (non-binding)
> >>
> >> Looking forward to this future.
> >>
> >> Thanks,
> >> Yu Chen
> >>
> >>> 2024年4月3日 10:23,Jark Wu  写道:
> >>>
> >>> +1 (binding)
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Tue, 2 Apr 2024 at 15:12, Timo Walther  wrote:
> >>>
>  +1 (binding)
> 
>  Thanks,
>  Timo
> 
>  On 29.03.24 17:30, Hao Li wrote:
> > Hi devs,
> >
> > I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> > SQL [1]. The discussion thread is here [2].
> >
> > The vote will be open for at least 72 hours unless there is an
> >> objection
>  or
> > insufficient votes.
> >
> > [1]
> >
> 
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >
> > [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
> >
> > Thanks,
> > Hao
> >
> 
> 
> >>
> >>
>
>


[jira] [Created] (FLINK-34994) JobIDLoggingITCase fails because of "checkpoint confirmation for unknown task"

2024-04-03 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-34994:
-

 Summary: JobIDLoggingITCase fails because of "checkpoint 
confirmation for unknown task"
 Key: FLINK-34994
 URL: https://issues.apache.org/jira/browse/FLINK-34994
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.20.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58640=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8735]

 

[https://github.com/apache/flink/actions/runs/8502821551/job/23287730632#step:10:8131]

 

[https://github.com/apache/flink/actions/runs/8507870399/job/23300810619#step:10:8086]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Leonard Xu
+1(binding)

Best,
Leonard

> 2024年4月3日 下午3:37,Piotr Nowojski  写道:
> 
> +1 (binding)
> 
> Best,
> Piotrek
> 
> śr., 3 kwi 2024 o 04:29 Yu Chen  napisał(a):
> 
>> +1 (non-binding)
>> 
>> Looking forward to this future.
>> 
>> Thanks,
>> Yu Chen
>> 
>>> 2024年4月3日 10:23,Jark Wu  写道:
>>> 
>>> +1 (binding)
>>> 
>>> Best,
>>> Jark
>>> 
>>> On Tue, 2 Apr 2024 at 15:12, Timo Walther  wrote:
>>> 
 +1 (binding)
 
 Thanks,
 Timo
 
 On 29.03.24 17:30, Hao Li wrote:
> Hi devs,
> 
> I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> SQL [1]. The discussion thread is here [2].
> 
> The vote will be open for at least 72 hours unless there is an
>> objection
 or
> insufficient votes.
> 
> [1]
> 
 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> 
> [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
> 
> Thanks,
> Hao
> 
 
 
>> 
>> 



Re: [VOTE] FLIP-437: Support ML Models in Flink SQL

2024-04-03 Thread Piotr Nowojski
+1 (binding)

Best,
Piotrek

śr., 3 kwi 2024 o 04:29 Yu Chen  napisał(a):

> +1 (non-binding)
>
> Looking forward to this future.
>
> Thanks,
> Yu Chen
>
> > 2024年4月3日 10:23,Jark Wu  写道:
> >
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Tue, 2 Apr 2024 at 15:12, Timo Walther  wrote:
> >
> >> +1 (binding)
> >>
> >> Thanks,
> >> Timo
> >>
> >> On 29.03.24 17:30, Hao Li wrote:
> >>> Hi devs,
> >>>
> >>> I'd like to start a vote on the FLIP-437: Support ML Models in Flink
> >>> SQL [1]. The discussion thread is here [2].
> >>>
> >>> The vote will be open for at least 72 hours unless there is an
> objection
> >> or
> >>> insufficient votes.
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A+Support+ML+Models+in+Flink+SQL
> >>>
> >>> [2] https://lists.apache.org/thread/9z94m2bv4w265xb5l2mrnh4lf9m28ccn
> >>>
> >>> Thanks,
> >>> Hao
> >>>
> >>
> >>
>
>


Re: [DISCUSS] FLIP-434: Support optimizations for pre-partitioned data sources

2024-04-03 Thread Leonard Xu
Hey, Jeyhun 

Thanks for kicking off this discussion. I have two questions about streaming 
sources:

(1)The FLIP  motivation section says Kafka broker is already partitioned w.r.t. 
some key[s] , Is this the main use case in Kafka world? Partitioning by key 
fields is not the default partitioner of Kafka default partitioner[1] IIUC.

(2) Considering the FLIP’s optimization scope aims to both Batch and Streaming 
pre-partitioned source, could you add a Streaming Source example to help me 
understand the  FLIP better? I think Kafka Source is a good candidates for 
streaming source example, file source is a good one for batch source and it 
really helped me to follow-up the FLIP.

Best,
Leonard
[1]https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31



> 2024年4月3日 上午5:53,Jeyhun Karimov  写道:
> 
> Hi Lincoln,
> 
> Thanks a lot for your comments. Please find my answers below.
> 
> 
> 1. Is this flip targeted only at batch scenarios or does it include
>> streaming?
>> (The flip and the discussion did not explicitly mention this, but in the
>> draft pr, I only
>> saw the implementation for batch scenarios
>> 
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
>> <
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
>>> 
>> )
>> If we expect this also apply to streaming, then we need to consider the
>> stricter
>> shuffle restrictions of streaming compared to batch (if support is
>> considered,
>> more discussion is needed here, let’s not expand for now). If it only
>> applies to batch,
>> it is recommended to clarify in the flip.
> 
> 
> - The FLIP targets both streaming and batch scenarios.
> Could you please elaborate more on what you mean by additional
> restrictions?
> 
> 
> 2. In the current implementation, the optimized plan seems to have some
>> problems.
>> As described in the class comments:
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> 
> BatchPhysicalHashAggregate (local)
> 
>   +- BatchPhysicalLocalHashAggregate (local)
>>  +- BatchPhysicalTableSourceScan
>> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
>> one-phase
>> hashAgg, localAgg is not necessary, which is the scenario currently handled
>> by
>> `RemoveRedundantLocalHashAggRule` and other rules)
> 
> 
> - Yes, you are completely right. Note that the PR you referenced is just a
> quick PoC.
> Redundant operators you mentioned exist because
> `RemoveRedundantShuffleRule` just removes the Exchange operator,
> without modifying upstream/downstream operators.
> As I mentioned, the implementation is just a PoC and the end implementation
> will make sure that existing redundancy elimination rules remove redundant
> operators.
> 
> 
> Also, in the draft pr,
>> the optimization of `testShouldEliminatePartitioning1` &
>> `testShouldEliminatePartitioning2`
>> seems didn't take effect?
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> 
> 
> -  Note that in this example, Exchange operator have a
> property KEEP_INPUT_AS_IS that indicates that data distribution is the same
> as its input.
> Since we have redundant operators (as shown above, two aggregate operators)
> one of the rules (not in this FLIP)
> adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> Similar to my comment above, the end implementation will be except from
> redundant operators.
> 
> In conjunction with question 2, I am wondering if we have a better choice
>> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
>> `RemoveRedundantLocalXXRule`s
>> to the `PHYSICAL_REWRITE`).
>> For example, let the source actively provide some traits (including
>> `FlinkRelDistribution`
>> and `RelCollation`) to the planner. The advantage of doing this is to
>> directly reuse the
>> current shuffle remove optimization (as `FlinkExpandConversionRule`
>> implemented),
>> and according to the data distribution characteristics provided by the
>> source, the planner
>> may choose a physical operator with a cheaper costs (for example, according
>> to `RelCollation`,
>> the planner can use sortAgg, no need for a separate local sort operation).
>> WDYT?
> 
> 
> - Good point. Makes sense to me. I will check FlinkExpandConversionRule to
> be utilized in the implementation.
> 
> 
> Regards,
> Jeyhun
> 
> 
> 
> On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee  wrote:
> 
>> Hi Jeyhun,
>> 
>> Thank you for driving this, it would be very useful optimization!
>> 
>> Sorry for joining the discussion now(I