Re: [VOTE] Release 1.16.3, release candidate #1

2023-11-20 Thread Rui Fan
+1 (non-binding)

Verified based on this wiki[1].

- Verified signatures and sha512
- The source archives do not contain any binaries
- Build the source with Maven 3 and java8 (Checked the license as well)
- bin/start-cluster.sh with java8, it works fine and no any unexpected LOG
- Ran demo, it's fine:  bin/flink run
examples/streaming/StateMachineExample.jar

[1]
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release

Best,
Rui

On Fri, Nov 17, 2023 at 11:52 AM Yun Tang  wrote:

> +1 (non-binding)
>
>
>   *   Verified signatures
>   *   Build from source code, and it looks good
>   *   Verified that jar packages are built with maven-3.2.5 and JDK8
>   *   Reviewed the flink-web PR
>   *   Start a local standalone cluster and submit examples
>
> Best
> Yun Tang
> 
> From: Rui Fan <1996fan...@gmail.com>
> Sent: Monday, November 13, 2023 18:20
> To: dev 
> Subject: [VOTE] Release 1.16.3, release candidate #1
>
> Hi everyone,
>
> Please review and vote on the release candidate #1 for the version 1.16.3,
>
> as follows:
>
> [ ] +1, Approve the release
>
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
>
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint B2D64016B940A7E0B9B72E0D7D0528B28037D8BC [3],
>
> * all artifacts to be deployed to the Maven Central Repository [4],
>
> * source code tag "release-1.16.3-rc1" [5],
>
> * website pull request listing the new release and adding announcement blog
> post [6].
>
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353259
>
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.16.3-rc1/
>
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>
> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1670/
>
> [5] https://github.com/apache/flink/releases/tag/release-1.16.3-rc1
>
> [6] https://github.com/apache/flink-web/pull/698
>
> Thanks,
> Release Manager
>


[jira] [Created] (FLINK-33603) Fix guava shading for GCS connector

2023-11-20 Thread Jayadeep Jayaraman (Jira)
Jayadeep Jayaraman created FLINK-33603:
--

 Summary: Fix guava shading for GCS connector
 Key: FLINK-33603
 URL: https://issues.apache.org/jira/browse/FLINK-33603
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.1
Reporter: Jayadeep Jayaraman


GCS connector has guava shading issue, This change introduced dependency on 
guava version({{{}31.1-jre{}}}) required by {{{}google-cloud-storage{}}}. 
Upgrade of {{google-cloud-storage}} lead to runtime failure because of new 
functionalities added in {{{}31.1-jre{}}}.

This change pins guava version to the one required by storage client 
specifically in {{{}flink-gs-fs-hadoop{}}}, leaving all other filesystem 
implementation untouched.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-20 Thread Hongshun Wang
Hi Becket,

> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue  as
a constructor parameter, which is not allowed  now.
Sorry, it was my writing mistake. What I meant is that *SplitFetcher*
requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher
is a class rather than Interface. Therefore, I want to  change
SplitFetcher to a public Interface and moving its implementation
details to an implement
subclass .

Thanks,
Hongshun Wang

On Fri, Nov 17, 2023 at 6:21 PM Becket Qin  wrote:

> Hi Hongshun,
>
> SplitFetcher.enqueueTask() returns void, right? SplitFetcherTask is already
> an interface, and we need to make that as a PublicEvolving API as well.
>
> So overall, a source developer can potentially do a few things in the
> SplitFetcherManager.
> 1. for customized logic including split-to-fetcher assignment, threading
> model, etc.
> 2. create their own SplitFetcherTask for the SplitFetcher / SplitReader to
> execute in a coordinated manner.
>
> It should be powerful enough for the vast majority of the source
> implementation, if not all.
>
>
> Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now.
>
> Are you referring to FetchTask which implements SplitFetcherTask? That
> class will remain internal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 17, 2023 at 5:23 PM Hongshun Wang 
> wrote:
>
> > Hi, Jiangjie(Becket) ,
> > Thank you for your advice. I have learned a lot.
> >
> > If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> >
> > I completely agree with you. However, if SplitFetcher becomes
> > PublicEvolving, SplitFetcherTask also needs to be PublicEvolving
> > because it is returned by the public method SplitFetcher#enqueueTask.
> > Additionally, SplitFetcherTask requires FutureCompletingBlockingQueue
> > as a
> > constructor parameter, which is not allowed
> > now. Therefore, I propose changing SplitFetcher to a public Interface
> > and moving its implementation details to an implement class (e.g.,
> > SplitFetcherImpl or another suitable name). SplitFetcherImpl will be
> > marked as internal and managed by SplitFetcherManager,
> > and put data in the queue.
> > Subclasses of SplitFetcherManager can only use the SplitFetcher
> interface,
> > also ensuring that the current subclasses are not affected.
> >
> >
> >
> > The current SplitFetcherManager basically looks up
> > > the SplitT from the fetcher with the split Id, and immediately passes
> the
> > > SplitT back to the fetcher, which is unnecessary.
> >
> > I inferred that this is because SplitReader#pauseOrResumeSplits
> > requires SplitT instead of SpiltId.  Perhaps some external source
> > requires more information to pause. However, SplitReader doesn't store
> > all its split data, while SplitFetcherManager saves them.
> > CC, @Dawid Wysakowicz
> >
> >
> >
> >  If not, SplitFetcher.pause() and
> > > SplitFetcher.resume() can be removed. In fact, they seem no longer used
> > > anywhere.
> >
> > It seems no use any more. CC, @Arvid Heise
> >
> >
> >
> > Thanks,
> > Hongshun Wang
> >
> > On Fri, Nov 17, 2023 at 11:42 AM Becket Qin 
> wrote:
> >
> > > Hi Hongshun,
> > >
> > > Thanks for updating the FLIP. I think that makes sense. A few comments
> > > below:
> > >
> > > 1. If SplitFetcherManager becomes PublicEvolving, that also means
> > > SplitFetcher needs to be PublicEvolving, because it is returned by the
> > > protected method SplitFetcherManager.createSplitFetcher().
> > >
> > > 2. When checking the API of the classes to be marked as PublicEvolving,
> > > there might be a few methods' signatures worth some discussion.
> > >
> > > For SplitFetcherManager:
> > > a) Currently removeSplits() methods takes a list of SplitT. I am
> > wondering
> > > if it should be a list of splitIds. SplitT actually contains two parts
> of
> > > information, the static split Id and some dynamically changing state of
> > the
> > > split (e.g. Kafka consumer offset). The source of truth for the dynamic
> > > state is SourceReaderBase. Currently we are passing in the full source
> > > split with the dynamic state for split removal. But it looks like only
> > > split id is needed for the split removal.
> > > Maybe this is intentional, as sometimes when a SplitReader removes a
> > split,
> > > it also wants to know the dynamic state of the split. If so, we can
> keep
> > it
> > > as is. But then the question is why
> > > SplitFetcherManager.pauseAndResumeSplits() only takes split ids instead
> > of
> > > SplitT. Should we make them consistent?
> > >
> > > For SplitFetcher:
> > > a) The SplitFetcher.pauseOrResumeSplits() method takes collections of
> > > SplitT as arguments. We may want to adjust that according to what we do
> > to
> > > the SplitFetcherManager. The current 

Re: [DISCUSS] FLIP-393: Make QueryOperations SQL serializable

2023-11-20 Thread Benchao Li
The FLIP looks good to me now, let's start the vote.

Dawid Wysakowicz  于2023年11月20日周一 22:36写道:
>
> @Benchao I added an example to the page.
>
> If there are no further comments, I'll start a vote on the FLIP tomorrow or
> the next day.
>
> Best,
> Dawid
>
> On Fri, 17 Nov 2023 at 12:20, xiangyu feng  wrote:
>
> > >After this FLIP is done, FLINK-25015() can utilize this ability to set
> > > job name for queries.
> >
> > +1 for this. Currently, when users submit sql jobs through table api, we
> > can't see the complete SQL string on flink ui. It would be easy for us to
> > finish this feature if we can get serialized sql from QueryOperation
> > directly.
> >
> > So +1 for the overall proposal.
> >
> > Regards,
> > Xiangyu
> >
> > Benchao Li  于2023年11月17日周五 19:07写道:
> >
> > > That sounds good to me, I'm looking forward to it!
> > >
> > > After this FLIP is done, FLINK-25015 can utilize this ability to set
> > > job name for queries.
> > >
> > > Dawid Wysakowicz  于2023年11月16日周四 21:16写道:
> > > >
> > > > Yes, the idea is to convert the QueryOperation tree into a
> > > > proper/compilable query. To be honest I didn't think it could be done
> > > > differently, sorry if I wasn't clear enough. Yes, it is very much like
> > > > SqlNode#toSqlString you mentioned. I'll add an example of a single
> > > > QueryOperation tree to the FLIP.
> > > >
> > > > I tried to focus only on the public contracts, not on the
> > implementation
> > > > details. I mentioned Expressions, because this requires changing
> > > > semi-public interfaces in BuiltinFunctionDefinitions.
> > > >
> > > > Hope this makes it clearer.
> > > >
> > > > Regards,
> > > > Dawid
> > > >
> > > > On Thu, 16 Nov 2023 at 12:12, Benchao Li  wrote:
> > > >
> > > > > Sorry that I wasn't expressing it clearly.
> > > > >
> > > > > Since the FLIP talks about two things: ResolvedExpression and
> > > > > QueryOperation, and you have illustrated how to serialize
> > > > > ResolvedExpression into SQL string. I'm wondering how you'll gonna to
> > > > > convert QueryOperation into SQL string.
> > > > >
> > > > > I was thinking that you proposed to convert the QueryOperation tree
> > > > > into a "complete runnable SQL statement", e.g.
> > > > >
> > > > >
> > >
> > ProjectQueryOperation(x,y)->FilterQueryOperation(z>10)->TableSourceQueryOperation(T),
> > > > > we'll get "SELECT x, y FROM T WHERE z > 10".
> > > > > Maybe I misread it, maybe you just meant to convert each
> > > > > QueryOperation into a row-level SQL string instead the whole tree
> > into
> > > > > a complete SQL statement.
> > > > >
> > > > > The idea of translating whole QueryOperation tree into SQL statement
> > > > > may come from my experience of Apache Calcite, there is a
> > > > > SqlImplementor[1] which convert a RelNode tree into SqlNode, and
> > > > > further we can use  SqlNode#toSqlString to unparse it into SQL
> > string.
> > > > > I would assume that most of our QueryOperations are much like the
> > > > > abstraction of Calcite's RelNode, with some exceptions such as
> > > > > PlannerQueryOperation.
> > > > >
> > > > > [1]
> > > > >
> > >
> > https://github.com/apache/calcite/blob/153796f8994831ad015af4b9036aa01ebf78/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java#L141
> > > > >
> > > > > Dawid Wysakowicz  于2023年11月16日周四
> > 16:24写道:
> > > > > >
> > > > > > I think the FLIP covers all public contracts that are necessary to
> > be
> > > > > > discussed at that level.
> > > > > >
> > > > > > If you meant you could not find a method that would be called to
> > > trigger
> > > > > > the translation then it is already there. It's just not implemented
> > > yet:
> > > > > > QueryOperation#asSerializableString[1]. As I mentioned this is
> > > mostly a
> > > > > > follow up to previous work.
> > > > > >
> > > > > > Regards,
> > > > > > Dawid
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> > https://github.com/apache/flink/blob/d18a4bfe596fc580f8280750fa3bfa22007671d9/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperation.java#L46
> > > > > >
> > > > > > On Wed, 15 Nov 2023 at 16:36, Benchao Li 
> > > wrote:
> > > > > >
> > > > > > > +1 for the idea of choosing SQL as the serialization format for
> > > > > > > QueryOperation, thanks for Dawid for driving this FLIP.
> > > > > > >
> > > > > > > Regarding the implementation, I didn't see the proposal for how
> > to
> > > > > > > translate QueryOperation to SQL yet, am I missing something? Or
> > the
> > > > > > > FLIP is still in preliminary state, you just want to gather ideas
> > > > > > > about whether to use SQL or something else as the serialization
> > > format
> > > > > > > for QueryOperation?
> > > > > > >
> > > > > > > Dawid Wysakowicz  于2023年11月15日周三
> > 19:34写道:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > > I would like to propose a follow-up improvement to some of the
> > > work
> > > > > that
> > > > > > > > has been done over the years to the Table API. I 

Re: [DISCUSS] Release flink-connector-pulsar 4.1.0

2023-11-20 Thread tison
I've created a task for this [1]. But it should not be a block for
Connector / Pulsar 4.1.0.

Best,
tison.

[1] https://issues.apache.org/jira/browse/FLINK-33602


tison  于2023年11月10日周五 19:44写道:

> > does it include support for Flink 1.18?
>
> Not yet. Tests for 1.16 and 1.17 can pass, but the latest 1.18-SNAPSHOT is
> not (and ditto 1.18.0). I'm afraid it's not a trivial fix so let's mark it
> as an issue but not a blocker.
>
> Best,
> tison.
>
>
> Martijn Visser  于2023年11月9日周四 16:21写道:
>
>> Hi Tison,
>>
>> I would be +1 for releasing it, but does it include support for Flink
>> 1.18? I think that the tests still failed for it, and I think that
>> support should be in place before releasing a new version of the
>> connector. What do you think?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Thu, Nov 9, 2023 at 7:09 AM Leonard Xu  wrote:
>> >
>> > Hey, Tison.
>> >
>> > +1 to release  flink-connector-pulsar 4.1.0.
>> >
>> > I’m glad to offer help for the release.
>> >
>> >
>> > Best,
>> > Leonard
>> >
>> >
>> >
>> > > 2023年11月9日 下午1:30,tison  写道:
>> > >
>> > > Hi,
>> > >
>> > > I'd propose to cut a new release for flink-connector-pulsar 4.1.0[1].
>> > >
>> > > From the last release (4.0.0), we mainly achieved:
>> > >
>> > > 1. Implement table connector (integrated with Flink SQL)
>> > > 2. Drop the requirement for using adminURL
>> > > 3. Support JDK 11
>> > >
>> > > I can help in driving the release but perhaps we need some more PMC
>> > > members' attention and help.
>> > >
>> > > What do you think?
>> > >
>> > > Best,
>> > > tison.
>> > >
>> > > [1] https://github.com/apache/flink-connector-pulsar
>> >
>>
>


[jira] [Created] (FLINK-33602) Pulsar connector should be compatible with Flink 1.18

2023-11-20 Thread Zili Chen (Jira)
Zili Chen created FLINK-33602:
-

 Summary: Pulsar connector should be compatible with Flink 1.18
 Key: FLINK-33602
 URL: https://issues.apache.org/jira/browse/FLINK-33602
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.18.0, pulsar-4.1.0
Reporter: Zili Chen
Assignee: Zili Chen


Currently, the build and test job always fails - 
https://github.com/apache/flink-connector-pulsar/actions/runs/6937440214



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33601) Implement restore tests for Expand node

2023-11-20 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33601:
--

 Summary: Implement restore tests for Expand node
 Key: FLINK-33601
 URL: https://issues.apache.org/jira/browse/FLINK-33601
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-11-20 Thread Ferenc Csaky
Hello devs,

Is any active work happening on this FLIP? As far as I see there
are blockers that needs to happen first to implement regarding
artifact distribution.

Is this work in halt completetly or some efforts are going into
resolve the blockers first or something?

Our platform would benefit this feature a lot, we have a kind of
working custom implementation at the moment, but it is uniquely
adapted to our app and platform.

I could help out to move this forward.

Best,
Ferenc



On Friday, June 30th, 2023 at 04:53, Paul Lam  wrote:


> 
> 
> Hi Jing,
> 
> Thanks for your input!
> 
> > Would you like to add
> > one section to describe(better with script/code example) how to use it in
> > these two scenarios from users' perspective?
> 
> 
> OK. I’ll update the FLIP with the code snippet after I get the POC branch 
> done.
> 
> > NIT: the pictures have transparent background when readers click on it. It
> > would be great if you can replace them with pictures with white background.
> 
> 
> Fixed. Thanks for pointing that out :)
> 
> Best,
> Paul Lam
> 
> > 2023年6月27日 06:51,Jing Ge j...@ververica.com.INVALID 写道:
> > 
> > Hi Paul,
> > 
> > Thanks for driving it and thank you all for the informative discussion! The
> > FLIP is in good shape now. As described in the FLIP, SQL Driver will be
> > mainly used to run Flink SQLs in two scenarios: 1. SQL client/gateway in
> > application mode and 2. external system integration. Would you like to add
> > one section to describe(better with script/code example) how to use it in
> > these two scenarios from users' perspective?
> > 
> > NIT: the pictures have transparent background when readers click on it. It
> > would be great if you can replace them with pictures with white background.
> > 
> > Best regards,
> > Jing
> > 
> > On Mon, Jun 26, 2023 at 1:31 PM Paul Lam  > mailto:paullin3...@gmail.com> wrote:
> > 
> > > Hi Shengkai,
> > > 
> > > > * How can we ship the json plan to the JobManager?
> > > 
> > > The Flink K8s module should be responsible for file distribution. We could
> > > introduce
> > > an option like `kubernetes.storage.dir`. For each flink cluster, there
> > > would be a
> > > dedicated subdirectory, with the pattern like
> > > `${kubernetes.storage.dir}/${cluster-id}`.
> > > 
> > > All resources-related options (e.g. pipeline jars, json plans) that are
> > > configured with
> > > scheme `file://`  > would be 
> > > uploaded to the resource directory
> > > and downloaded to the
> > > jobmanager, before SQL Driver accesses the files with the original
> > > filenames.
> > > 
> > > > * Classloading strategy
> > > 
> > > We could directly specify the SQL Gateway jar as the jar file in
> > > PackagedProgram.
> > > It would be treated like a normal user jar and the SQL Driver is loaded
> > > into the user
> > > classloader. WDYT?
> > > 
> > > > * Option `$internal.sql-gateway.driver.sql-config` is string type
> > > > I think it's better to use Map type here
> > > 
> > > By Map type configuration, do you mean a nested map that contains all
> > > configurations?
> > > 
> > > I hope I've explained myself well, it’s a file that contains the extra SQL
> > > configurations, which would be shipped to the jobmanager.
> > > 
> > > > * PoC branch
> > > 
> > > Sure. I’ll let you know once I get the job done.
> > > 
> > > Best,
> > > Paul Lam
> > > 
> > > > 2023年6月26日 14:27,Shengkai Fang  > > > mailto:fskm...@gmail.com> 写道:
> > > > 
> > > > Hi, Paul.
> > > > 
> > > > Thanks for your update. I have a few questions about the new design:
> > > > 
> > > > * How can we ship the json plan to the JobManager?
> > > > 
> > > > The current design only exposes an option about the URL of the json
> > > > plan. It seems the gateway is responsible to upload to an external 
> > > > stroage.
> > > > Can we reuse the PipelineOptions.JARS to ship to the remote filesystem?
> > > > 
> > > > * Classloading strategy
> > > > 
> > > > Currently, the Driver is in the sql-gateway package. It means the Driver
> > > > is not in the JM's classpath directly. Because the sql-gateway jar is 
> > > > now
> > > > in the opt directory rather than lib directory. It may need to add the
> > > > external dependencies as Python does[1]. BTW, I think it's better to 
> > > > move
> > > > the Driver into the flink-table-runtime package, which is much easier to
> > > > find(Sorry for the wrong opinion before).
> > > > 
> > > > * Option `$internal.sql-gateway.driver.sql-config` is string type
> > > > 
> > > > I think it's better to use Map type here
> > > > 
> > > > * PoC branch
> > > > 
> > > > Because this FLIP involves many modules, do you have a PoC branch to
> > > > verify it does work?
> > > > 
> > > > Best,
> > > > Shengkai
> > > > 
> > > > [1]
> > > > https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L940
> > > >  
> > > > 

Re: [VOTE] Apache Flink Kubernetes Operator Release 1.7.0, release candidate #1

2023-11-20 Thread Maximilian Michels
+1 (binding)

1. Downloaded the archives, checksums, and signatures
2. Verified the signatures and checksums
3. Extract and inspect the source code for binaries
4. Compiled and tested the source code via mvn verify
5. Verified license files / headers
6. Deployed helm chart to test cluster
7. Build and ran dynamic autoscaling example image
8. Tested autoscaling without rescaling API

Hit a non-fatal error collecting metrics in the stabilization phase
(this is a new feature), not a release blocker though [1].

-Max

[1] Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Cannot
connect to ResourceManager right now. Please try to refresh.
at 
org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler.lambda$getResourceManagerGateway$0(AbstractResourceManagerHandler.java:91)
 at
java.base/java.util.Optional.orElseThrow(Unknown Source)
at 
org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler.getResourceManagerGateway(AbstractResourceManagerHandler.java:89)
...

On Mon, Nov 20, 2023 at 5:48 PM Márton Balassi  wrote:
>
> +1 (binding)
>
> - Verified Helm repo works as expected, points to correct image tag, build,
> version
> - Verified basic examples + checked operator logs everything looks as
> expected
> - Verified hashes, signatures and source release contains no binaries
> - Ran built-in tests, built jars + docker image from source successfully
> - Upgraded the operator and the CRD from 1.6.1 to 1.7.0
>
> Best,
> Marton
>
> On Mon, Nov 20, 2023 at 2:03 PM Gyula Fóra  wrote:
>
> > +1 (binding)
> >
> > Verified:
> >  - Release files, maven repo contents, checksums, signature
> >  - Verified and installed from Helm chart
> >  - Ran basic stateful example and verified
> >- Upgrade flow
> >- No errors in logs
> >- Autoscaler (turn on/off, verify configmap cleared correctly)
> >- In-place scaling with 1.18 and adaptive scheduler
> >  - Built from source with Java 11 & 17
> >  - Checked release notes
> >
> > Cheers,
> > Gyula
> >
> > On Fri, Nov 17, 2023 at 1:59 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > +1(non-binding)
> > >
> > > - Downloaded artifacts from dist
> > > - Verified SHA512 checksums
> > > - Verified GPG signatures
> > > - Build the source with java-11 and java-17
> > > - Verified the license header
> > > - Verified that chart and appVersion matches the target release
> > > - RC repo works as Helm rep(helm repo add flink-operator-repo-1.7.0-rc1
> > >
> > >
> > https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> > > )
> > > - Verified Helm chart can be installed  (helm install
> > > flink-kubernetes-operator
> > > flink-operator-repo-1.7.0-rc1/flink-kubernetes-operator --set
> > > webhook.create=false)
> > > - Submitted the autoscaling demo, the autoscaler works well with rescale
> > > api (kubectl apply -f autoscaling.yaml)
> > > - Download Autoscaler standalone: wget
> > >
> > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1672/org/apache/flink/flink-autoscaler-standalone/1.7.0/flink-autoscaler-standalone-1.7.0.jar
> > > - Ran Autoscaler standalone locally, it works well with rescale api
> > >
> > > Best,
> > > Rui
> > >
> > > On Fri, Nov 17, 2023 at 2:45 AM Mate Czagany  wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > - Checked signatures, checksums
> > > > - No binaries found in the source release
> > > > - Verified all source files contain the license header
> > > > - All pom files point to the correct version
> > > > - Verified Helm chart version and appVersion
> > > > - Verified Docker image tag
> > > > - Ran flink-autoscaler-standalone JAR downloaded from the maven
> > > repository
> > > > - Tested autoscaler upscales correctly on load with Flink 1.18
> > rescaling
> > > > API
> > > >
> > > > Thanks,
> > > > Mate
> > > >
> > > > Gyula Fóra  ezt írta (időpont: 2023. nov. 15., Sze,
> > > > 16:37):
> > > >
> > > > > Hi Everyone,
> > > > >
> > > > > Please review and vote on the release candidate #1 for the version
> > > 1.7.0
> > > > of
> > > > > Apache Flink Kubernetes Operator,
> > > > > as follows:
> > > > > [ ] +1, Approve the release
> > > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > > >
> > > > > **Release Overview**
> > > > >
> > > > > As an overview, the release consists of the following:
> > > > > a) Kubernetes Operator canonical source distribution (including the
> > > > > Dockerfile), to be deployed to the release repository at
> > > dist.apache.org
> > > > > b) Kubernetes Operator Helm Chart to be deployed to the release
> > > > repository
> > > > > at dist.apache.org
> > > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > > d) Docker image to be pushed to dockerhub
> > > > >
> > > > > **Staging Areas to Review**
> > > > >
> > > > > The staging areas containing the above mentioned 

Re: Proposal for Implementing Keyed Watermarks in Apache Flink

2023-11-20 Thread David Morávek
The paper looks interesting, but it might not manifest the described
benefit for practical reasons:

1. It forces you to remember all keys in the broadcasted (partitioned is
impossible without timeouts, etc.) operator state. Forever. This itself is
a blocker for a bunch of pipelines. The primary motivation for using the
state is that you can't simply recompute the WM as with the global one.
2. It's super easy to run into idleness issues (it's almost likely).
3. Thinking about multiple chained aggregations on different keys is just
... 勞
4. This is a significant change to public APIs.

The main problem the paper needs to address is idleness and stuck per-key
watermarks (pipeline not making progress).

What do you think about these issues?

Best,
D.


On Sat, Nov 18, 2023 at 6:41 PM Tawfek Yasser Tawfek 
wrote:

> Hello Alexander,
>
> Will we continue the discussion?
>
>
>
> Thanks & BR,
>
> Tawfik
>
> 
> From: Tawfek Yasser Tawfek 
> Sent: 30 October 2023 15:32
> To: dev@flink.apache.org 
> Subject: Re: Proposal for Implementing Keyed Watermarks in Apache Flink
>
> Hi Alexander,
>
> Thank you for your reply.
>
> Yes. As you showed keyed-watermarks mechanism is mainly required for the
> case when we need a fine-grained calculation for each partition
> [Calculation over data produced by each individual sensor], as scalability
> factors require partitioning the calculations,
> so, the keyed-watermarks mechanism is designed for this type of problem.
>
> Thanks,
> Tawfik
> 
> From: Alexander Fedulov 
> Sent: 30 October 2023 13:37
> To: dev@flink.apache.org 
> Subject: Re: Proposal for Implementing Keyed Watermarks in Apache Flink
>
> [You don't often get email from alexander.fedu...@gmail.com. Learn why
> this is important at https://aka.ms/LearnAboutSenderIdentification ]
>
> Hi Tawfek,
>
> > The idea is to generate a watermark for each key (sub-stream), in order
> to avoid the fast progress of the global watermark which affects low-rate
> sources.
>
> Let's consider the sensors example from the paper. Shouldn't it be about
> the delay between the time of taking the measurement and its arrival at
> Flink, rather than the rate at which the measurements are produced? If a
> particular sensor produces no data during a specific time window, it
> doesn't make sense to wait for it—there won't be any corresponding
> measurement arriving because none was produced. Thus, I believe we should
> be talking about situations where data from certain sensors can arrive with
> significant delay compared to most other sensors.
>
> From the perspective of data aggregation, there are two main scenarios:
> 1) Calculation over data produced by multiple sensors
> 2) Calculation over data produced by an individual sensor
>
> In scenario 1), there are two subcategories:
> a) Meaningful results cannot be produced without data from those delayed
> sensors; hence, you need to wait longer.
>   => Time is propagated by the mix of all sources. You just need to set
> a bounded watermark with enough lag to accommodate the delayed results.
> This is precisely what event time processing and bounded watermarks are for
> (no keyed watermarking is required).
> b) You need to produce the results as they are and perhaps patch them later
> when the delayed data arrives.
>  => Time is propagated by the mix of all sources. You produce the
> results as they are but utilize allowedLateness to patch the aggregates if
> needed (no keyed watermarking is required).
>
> So, is it correct to say that keyed watermarking is applicable only in
> scenario 2)?
>
> Best,
> Alexander Fedulov
>
> On Sat, 28 Oct 2023 at 14:33, Tawfek Yasser Tawfek 
> wrote:
>
> > Thanks, Alexander for your reply.
> >
> > Our solution initiated from this inquiry on Stack Overflow:
> >
> >
> https://stackoverflow.com/questions/52179898/does-flink-support-keyed-watermarks-if-not-is-there-any-plan-of-implementing-i
> >
> > The idea is to generate a watermark for each key (sub-stream), in order
> to
> > avoid the fast progress of the global watermark which affects low-rate
> > sources.
> >
> > Instead of using only one watermark (vanilla/global watermark), we
> changed
> > the API to allow moving the keyBy() before the
> > assignTimestampsAndWatermarks() so the stream will be partitioned then
> the
> > TimestampsAndWatermarkOperator will handle the generation of each
> watermark
> > for each key (source/sub-stream/partition).
> >
> > *Let's discuss more if you want I have a presentation at a conference, we
> > can meet or whatever is suitable.*
> >
> > Also, I contacted David Anderson one year ago and he followed me step by
> > step and helped me a lot.
> >
> > I attached some messages with David.
> >
> >
> > *Thanks & BR,*
> >
> >
> > 
> >
> >
> >
> >
> >  Tawfik Yasser Tawfik
> >
> > * Teaching Assistant | AI-ITCS-NU*
> >
> >  Office: UB1-B, Room 229
> >
> >  26th of July Corridor, Sheikh Zayed 

Re: [DISCUSS] REST API behaviour when user main method throws error

2023-11-20 Thread David Morávek
Hi Danny,

> My current proposal is that the REST API should not leave the Flink
cluster
in an inconsistent state.

Regarding consistency, Flink only cares about individual jobs, but I can
see your point.

For streaming, this is probably something we could address by book-keeping
jobs submitted by the jar and canceling them on exception. This is not
prone to JM failure and would be subject to bug reports because there are
no guarantees (it could be addressed; it's not straightforward; we've spent
years getting this correct for Application mode).

A bigger problem is that the main method could have the side-effects that
you don't know how to roll-back. For example, creating directories for
batch outputs and moving files. To make this 100% correct, we'd need to
introduce a new set of client-facing APIs.

I'm unaware of any framework that did this right (MR, Spark, etc had
the same issue); you must solve HA for drivers/client programs first.

These are quick thoughts; something simple that works for 90% might be
worth pursuing, ignoring the corner cases.

Best,
D.

On Tue, Nov 14, 2023 at 10:00 AM Danny Cranmer 
wrote:

> Hey all,
>
> We run Flink clusters in session mode; we upload the user jar and then
> invoke "/jars/:jarid/run" [1] REST API endpoint. We have noticed a
> discrepancy in the run endpoint and were hoping to get some feedback from
> the community before proposing a FLIP or Jira to fix it.
>
> Some problem context: The "/jars/:jarid/run" endpoint runs the main()
> method in the user jar. When the call is successful the API will return the
> job ID (case 1). When the call throws an error, the API will return the
> error message (case 2). If a job is submitted successfully AND it throws an
> error, the result is a running job but the API returns the error message
> (case 3). There are two common cases that can result in this failure mode:
> 1/ the user code attempts to run multiple jobs, the first is successful and
> the second is rejected [2]. 2/ the user code throws an arbitrary exception
> after successfully submitting a job. Reproduction code for both is included
> below. For case 3 the client must 1/ run a jar, 2/ query for running jobs
> and 3/ decide how to proceed, either cleaning up or marking it as
> successful. This is overloading the responsibility of the client.
>
> My current proposal is that the REST API should not leave the Flink cluster
> in an inconsistent state. If the run command is successful we should have a
> running job, if the run command fails, we should not have any running jobs.
> There are a few ways to achieve this, but I would like to leave that
> discussion to the FLIP. Right now I am looking for feedback on the desired
> API behaviour.
>
> 1/ The user code attempts to run multiple jobs (Flink 1.15)
>
> public class MultipleJobs {
> public static final long ROWS_PER_SECOND = 1;
> public static final long TOTAL_ROWS = 1_000_000;
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND,
> TOTAL_ROWS))
> .returns(String.class)
> .print();
>
> env.execute("Job #1");
>
> env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND,
> TOTAL_ROWS))
> .returns(String.class)
> .print();
>
> env.execute("Job #2");
> }
> }
>
>
> 2/ The user code throws an arbitrary exception after successfully
> submitting a job (Flink 1.15)
>
> public class CustomerErrorJobSubmission {
> public static final long ROWS_PER_SECOND = 1;
> public static final long TOTAL_ROWS = 1_000_000;
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.addSource(new DataGeneratorSource<>(stringGenerator(32),
> ROWS_PER_SECOND, TOTAL_ROWS))
> .returns(String.class)
> .print();
>
> env.execute("Job #1");
>
> throw new RuntimeException("REST API call will fail, but there
> will still be a job running");
> }
> }
>
>
> --
>
>
> Thanks,
> Danny
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jars-jarid-run
> [2]
>
> https://github.com/apache/flink/blob/release-1.15/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java#L198
>


Re: [VOTE] Apache Flink Kubernetes Operator Release 1.7.0, release candidate #1

2023-11-20 Thread Márton Balassi
+1 (binding)

- Verified Helm repo works as expected, points to correct image tag, build,
version
- Verified basic examples + checked operator logs everything looks as
expected
- Verified hashes, signatures and source release contains no binaries
- Ran built-in tests, built jars + docker image from source successfully
- Upgraded the operator and the CRD from 1.6.1 to 1.7.0

Best,
Marton

On Mon, Nov 20, 2023 at 2:03 PM Gyula Fóra  wrote:

> +1 (binding)
>
> Verified:
>  - Release files, maven repo contents, checksums, signature
>  - Verified and installed from Helm chart
>  - Ran basic stateful example and verified
>- Upgrade flow
>- No errors in logs
>- Autoscaler (turn on/off, verify configmap cleared correctly)
>- In-place scaling with 1.18 and adaptive scheduler
>  - Built from source with Java 11 & 17
>  - Checked release notes
>
> Cheers,
> Gyula
>
> On Fri, Nov 17, 2023 at 1:59 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > +1(non-binding)
> >
> > - Downloaded artifacts from dist
> > - Verified SHA512 checksums
> > - Verified GPG signatures
> > - Build the source with java-11 and java-17
> > - Verified the license header
> > - Verified that chart and appVersion matches the target release
> > - RC repo works as Helm rep(helm repo add flink-operator-repo-1.7.0-rc1
> >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> > )
> > - Verified Helm chart can be installed  (helm install
> > flink-kubernetes-operator
> > flink-operator-repo-1.7.0-rc1/flink-kubernetes-operator --set
> > webhook.create=false)
> > - Submitted the autoscaling demo, the autoscaler works well with rescale
> > api (kubectl apply -f autoscaling.yaml)
> > - Download Autoscaler standalone: wget
> >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1672/org/apache/flink/flink-autoscaler-standalone/1.7.0/flink-autoscaler-standalone-1.7.0.jar
> > - Ran Autoscaler standalone locally, it works well with rescale api
> >
> > Best,
> > Rui
> >
> > On Fri, Nov 17, 2023 at 2:45 AM Mate Czagany  wrote:
> >
> > > +1 (non-binding)
> > >
> > > - Checked signatures, checksums
> > > - No binaries found in the source release
> > > - Verified all source files contain the license header
> > > - All pom files point to the correct version
> > > - Verified Helm chart version and appVersion
> > > - Verified Docker image tag
> > > - Ran flink-autoscaler-standalone JAR downloaded from the maven
> > repository
> > > - Tested autoscaler upscales correctly on load with Flink 1.18
> rescaling
> > > API
> > >
> > > Thanks,
> > > Mate
> > >
> > > Gyula Fóra  ezt írta (időpont: 2023. nov. 15., Sze,
> > > 16:37):
> > >
> > > > Hi Everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for the version
> > 1.7.0
> > > of
> > > > Apache Flink Kubernetes Operator,
> > > > as follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > > **Release Overview**
> > > >
> > > > As an overview, the release consists of the following:
> > > > a) Kubernetes Operator canonical source distribution (including the
> > > > Dockerfile), to be deployed to the release repository at
> > dist.apache.org
> > > > b) Kubernetes Operator Helm Chart to be deployed to the release
> > > repository
> > > > at dist.apache.org
> > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > d) Docker image to be pushed to dockerhub
> > > >
> > > > **Staging Areas to Review**
> > > >
> > > > The staging areas containing the above mentioned artifacts are as
> > > follows,
> > > > for your review:
> > > > * All artifacts for a,b) can be found in the corresponding dev
> > repository
> > > > at dist.apache.org [1]
> > > > * All artifacts for c) can be found at the Apache Nexus Repository
> [2]
> > > > * The docker image for d) is staged on github [3]
> > > >
> > > > All artifacts are signed with the key 21F06303B87DAFF1 [4]
> > > >
> > > > Other links for your review:
> > > > * JIRA release notes [5]
> > > > * source code tag "release-1.7.0-rc1" [6]
> > > > * PR to update the website Downloads page to
> > > > include Kubernetes Operator links [7]
> > > >
> > > > **Vote Duration**
> > > >
> > > > The voting time will run for at least 72 hours.
> > > > It is adopted by majority approval, with at least 3 PMC affirmative
> > > votes.
> > > >
> > > > **Note on Verification**
> > > >
> > > > You can follow the basic verification guide here[8].
> > > > Note that you don't need to verify everything yourself, but please
> make
> > > > note of what you have tested together with your +- vote.
> > > >
> > > > Cheers!
> > > > Gyula Fora
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> > > > [2]
> > > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1672/
> > > > [3] ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> > > > [4] 

[jira] [Created] (FLINK-33600) Print cost time for batch queries in SQL Client

2023-11-20 Thread Jark Wu (Jira)
Jark Wu created FLINK-33600:
---

 Summary: Print cost time for batch queries in SQL Client
 Key: FLINK-33600
 URL: https://issues.apache.org/jira/browse/FLINK-33600
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: Jark Wu
 Fix For: 1.19.0


Currently, there is no cost time information when executing batch queries in 
SQL CLI. But this is very helpful in OLAP/ad-hoc scenarios. 

For example: 
{code}
Flink SQL> select * from (values ('abc', 123));
+++
| EXPR$0 | EXPR$1 |
+++
|abc |123 |
+++
1 row in set  (0.22 seconds)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33599) Run restore tests with RocksDB state backend

2023-11-20 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-33599:


 Summary: Run restore tests with RocksDB state backend
 Key: FLINK-33599
 URL: https://issues.apache.org/jira/browse/FLINK-33599
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-393: Make QueryOperations SQL serializable

2023-11-20 Thread Dawid Wysakowicz
@Benchao I added an example to the page.

If there are no further comments, I'll start a vote on the FLIP tomorrow or
the next day.

Best,
Dawid

On Fri, 17 Nov 2023 at 12:20, xiangyu feng  wrote:

> >After this FLIP is done, FLINK-25015() can utilize this ability to set
> > job name for queries.
>
> +1 for this. Currently, when users submit sql jobs through table api, we
> can't see the complete SQL string on flink ui. It would be easy for us to
> finish this feature if we can get serialized sql from QueryOperation
> directly.
>
> So +1 for the overall proposal.
>
> Regards,
> Xiangyu
>
> Benchao Li  于2023年11月17日周五 19:07写道:
>
> > That sounds good to me, I'm looking forward to it!
> >
> > After this FLIP is done, FLINK-25015 can utilize this ability to set
> > job name for queries.
> >
> > Dawid Wysakowicz  于2023年11月16日周四 21:16写道:
> > >
> > > Yes, the idea is to convert the QueryOperation tree into a
> > > proper/compilable query. To be honest I didn't think it could be done
> > > differently, sorry if I wasn't clear enough. Yes, it is very much like
> > > SqlNode#toSqlString you mentioned. I'll add an example of a single
> > > QueryOperation tree to the FLIP.
> > >
> > > I tried to focus only on the public contracts, not on the
> implementation
> > > details. I mentioned Expressions, because this requires changing
> > > semi-public interfaces in BuiltinFunctionDefinitions.
> > >
> > > Hope this makes it clearer.
> > >
> > > Regards,
> > > Dawid
> > >
> > > On Thu, 16 Nov 2023 at 12:12, Benchao Li  wrote:
> > >
> > > > Sorry that I wasn't expressing it clearly.
> > > >
> > > > Since the FLIP talks about two things: ResolvedExpression and
> > > > QueryOperation, and you have illustrated how to serialize
> > > > ResolvedExpression into SQL string. I'm wondering how you'll gonna to
> > > > convert QueryOperation into SQL string.
> > > >
> > > > I was thinking that you proposed to convert the QueryOperation tree
> > > > into a "complete runnable SQL statement", e.g.
> > > >
> > > >
> >
> ProjectQueryOperation(x,y)->FilterQueryOperation(z>10)->TableSourceQueryOperation(T),
> > > > we'll get "SELECT x, y FROM T WHERE z > 10".
> > > > Maybe I misread it, maybe you just meant to convert each
> > > > QueryOperation into a row-level SQL string instead the whole tree
> into
> > > > a complete SQL statement.
> > > >
> > > > The idea of translating whole QueryOperation tree into SQL statement
> > > > may come from my experience of Apache Calcite, there is a
> > > > SqlImplementor[1] which convert a RelNode tree into SqlNode, and
> > > > further we can use  SqlNode#toSqlString to unparse it into SQL
> string.
> > > > I would assume that most of our QueryOperations are much like the
> > > > abstraction of Calcite's RelNode, with some exceptions such as
> > > > PlannerQueryOperation.
> > > >
> > > > [1]
> > > >
> >
> https://github.com/apache/calcite/blob/153796f8994831ad015af4b9036aa01ebf78/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java#L141
> > > >
> > > > Dawid Wysakowicz  于2023年11月16日周四
> 16:24写道:
> > > > >
> > > > > I think the FLIP covers all public contracts that are necessary to
> be
> > > > > discussed at that level.
> > > > >
> > > > > If you meant you could not find a method that would be called to
> > trigger
> > > > > the translation then it is already there. It's just not implemented
> > yet:
> > > > > QueryOperation#asSerializableString[1]. As I mentioned this is
> > mostly a
> > > > > follow up to previous work.
> > > > >
> > > > > Regards,
> > > > > Dawid
> > > > >
> > > > > [1]
> > > > >
> > > >
> >
> https://github.com/apache/flink/blob/d18a4bfe596fc580f8280750fa3bfa22007671d9/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/QueryOperation.java#L46
> > > > >
> > > > > On Wed, 15 Nov 2023 at 16:36, Benchao Li 
> > wrote:
> > > > >
> > > > > > +1 for the idea of choosing SQL as the serialization format for
> > > > > > QueryOperation, thanks for Dawid for driving this FLIP.
> > > > > >
> > > > > > Regarding the implementation, I didn't see the proposal for how
> to
> > > > > > translate QueryOperation to SQL yet, am I missing something? Or
> the
> > > > > > FLIP is still in preliminary state, you just want to gather ideas
> > > > > > about whether to use SQL or something else as the serialization
> > format
> > > > > > for QueryOperation?
> > > > > >
> > > > > > Dawid Wysakowicz  于2023年11月15日周三
> 19:34写道:
> > > > > > >
> > > > > > > Hi,
> > > > > > > I would like to propose a follow-up improvement to some of the
> > work
> > > > that
> > > > > > > has been done over the years to the Table API. I posted the
> > proposed
> > > > > > > changes in the FLIP-393. I'd like to get to know what others
> > think of
> > > > > > > choosing SQL as the serialization format for QueryOperations.
> > > > > > > Regards,
> > > > > > > Dawid Wysakowicz
> > > > > > >
> > > > > > > [1] https://cwiki.apache.org/confluence/x/vQ2ZE
> > > > > >
> > > > > >
> > > > > >
> > 

[jira] [Created] (FLINK-33598) Watch HA configmap via name instead of lables to reduce pressure on APIserver

2023-11-20 Thread Yun Tang (Jira)
Yun Tang created FLINK-33598:


 Summary: Watch HA configmap via name instead of lables to reduce 
pressure on APIserver 
 Key: FLINK-33598
 URL: https://issues.apache.org/jira/browse/FLINK-33598
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.17.1, 1.18.0
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.19.0, 1.18.1, 1.17.3


As FLINK-24819 described, the k8s API server would receive more pressure when 
HA is enabled, due to the configmap watching being achieved via filter with 
labels instead of just querying the configmap name. This could be done after 
FLINK-24038, which reduced the number of configmaps to only one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Apache Flink Kubernetes Operator Release 1.7.0, release candidate #1

2023-11-20 Thread Gyula Fóra
+1 (binding)

Verified:
 - Release files, maven repo contents, checksums, signature
 - Verified and installed from Helm chart
 - Ran basic stateful example and verified
   - Upgrade flow
   - No errors in logs
   - Autoscaler (turn on/off, verify configmap cleared correctly)
   - In-place scaling with 1.18 and adaptive scheduler
 - Built from source with Java 11 & 17
 - Checked release notes

Cheers,
Gyula

On Fri, Nov 17, 2023 at 1:59 PM Rui Fan <1996fan...@gmail.com> wrote:

> +1(non-binding)
>
> - Downloaded artifacts from dist
> - Verified SHA512 checksums
> - Verified GPG signatures
> - Build the source with java-11 and java-17
> - Verified the license header
> - Verified that chart and appVersion matches the target release
> - RC repo works as Helm rep(helm repo add flink-operator-repo-1.7.0-rc1
>
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> )
> - Verified Helm chart can be installed  (helm install
> flink-kubernetes-operator
> flink-operator-repo-1.7.0-rc1/flink-kubernetes-operator --set
> webhook.create=false)
> - Submitted the autoscaling demo, the autoscaler works well with rescale
> api (kubectl apply -f autoscaling.yaml)
> - Download Autoscaler standalone: wget
>
> https://repository.apache.org/content/repositories/orgapacheflink-1672/org/apache/flink/flink-autoscaler-standalone/1.7.0/flink-autoscaler-standalone-1.7.0.jar
> - Ran Autoscaler standalone locally, it works well with rescale api
>
> Best,
> Rui
>
> On Fri, Nov 17, 2023 at 2:45 AM Mate Czagany  wrote:
>
> > +1 (non-binding)
> >
> > - Checked signatures, checksums
> > - No binaries found in the source release
> > - Verified all source files contain the license header
> > - All pom files point to the correct version
> > - Verified Helm chart version and appVersion
> > - Verified Docker image tag
> > - Ran flink-autoscaler-standalone JAR downloaded from the maven
> repository
> > - Tested autoscaler upscales correctly on load with Flink 1.18 rescaling
> > API
> >
> > Thanks,
> > Mate
> >
> > Gyula Fóra  ezt írta (időpont: 2023. nov. 15., Sze,
> > 16:37):
> >
> > > Hi Everyone,
> > >
> > > Please review and vote on the release candidate #1 for the version
> 1.7.0
> > of
> > > Apache Flink Kubernetes Operator,
> > > as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > > **Release Overview**
> > >
> > > As an overview, the release consists of the following:
> > > a) Kubernetes Operator canonical source distribution (including the
> > > Dockerfile), to be deployed to the release repository at
> dist.apache.org
> > > b) Kubernetes Operator Helm Chart to be deployed to the release
> > repository
> > > at dist.apache.org
> > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > d) Docker image to be pushed to dockerhub
> > >
> > > **Staging Areas to Review**
> > >
> > > The staging areas containing the above mentioned artifacts are as
> > follows,
> > > for your review:
> > > * All artifacts for a,b) can be found in the corresponding dev
> repository
> > > at dist.apache.org [1]
> > > * All artifacts for c) can be found at the Apache Nexus Repository [2]
> > > * The docker image for d) is staged on github [3]
> > >
> > > All artifacts are signed with the key 21F06303B87DAFF1 [4]
> > >
> > > Other links for your review:
> > > * JIRA release notes [5]
> > > * source code tag "release-1.7.0-rc1" [6]
> > > * PR to update the website Downloads page to
> > > include Kubernetes Operator links [7]
> > >
> > > **Vote Duration**
> > >
> > > The voting time will run for at least 72 hours.
> > > It is adopted by majority approval, with at least 3 PMC affirmative
> > votes.
> > >
> > > **Note on Verification**
> > >
> > > You can follow the basic verification guide here[8].
> > > Note that you don't need to verify everything yourself, but please make
> > > note of what you have tested together with your +- vote.
> > >
> > > Cheers!
> > > Gyula Fora
> > >
> > > [1]
> > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-1.7.0-rc1/
> > > [2]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1672/
> > > [3] ghcr.io/apache/flink-kubernetes-operator:ccb10b8
> > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [5]
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462
> > > [6]
> > >
> >
> https://github.com/apache/flink-kubernetes-operator/tree/release-1.7.0-rc1
> > > [7] https://github.com/apache/flink-web/pull/699
> > > [8]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Kubernetes+Operator+Release
> > >
> >
>


[DISCUSS][FLINK-32993] Datagen connector handles length-constrained fields according to the schema definition by default

2023-11-20 Thread 李宇彬
Hi everyone,


Currently, the Datagen connector generates data that doesn't match the schema 
definition 
when dealing with fixed-length and variable-length fields. It defaults to a 
unified length of 100 
and requires manual configuration by the user. This violates the correctness of 
schema constraints 
and hampers ease of use.


Jane Chan and I have discussed offline and I will summarize our discussion 
below.


To enhance the datagen connector to automatically generate data that conforms 
to the schema 
definition without additional manual configuration, we propose handling the 
following data types 
appropriately [1]:
  1. For fixed-length data types (char, binary), the length should be 
defined by the schema definition 
 and not be user-defined.
  2. For variable-length data types (varchar, varbinary), the length should 
be defined by the schema 
  definition, but allow for user-defined lengths that are smaller than 
the schema definition.



Looking forward to your feedback :)


[1] https://issues.apache.org/jira/browse/FLINK-32993


Best,
Yubin



Re: [VOTE] Release flink-connector-aws, v4.2.0 release candidate #1

2023-11-20 Thread Jiabao Sun
Thanks Danny for driving the release,

+1 (non-binding)

- built from source code succeeded
- verified signatures
- verified hashsums 
- checked release notes

Best,
Jiabao


> 2023年11月20日 19:11,Danny Cranmer  写道:
> 
> Hello all,
> 
> +1 (binding).
> 
> - Release notes look good
> - Signatures and checksums match
> - There are no binaries in the source archive
> - pom versions are correct
> - Tag is present in Github
> - CI passes against FLink 1.17 and 1.18
> - Source build and tests pass
> 
> Thanks,
> Danny
> 
> On Wed, Nov 1, 2023 at 1:15 AM mystic lama  wrote:
> 
>> +1 (non-binding)
>> 
>> - validated shasum
>> - verified build
>>   - Java 8   - build good and all test cases pass
>>   - Java 11 - build good and all test cases pass
>> 
>> Observations: got test failures with Java 17, something to look for in
>> future
>> 
>> On Tue, 31 Oct 2023 at 08:42, Danny Cranmer 
>> wrote:
>> 
>>> Hi everyone,
>>> 
>>> Please review and vote on release candidate #1 for the version 4.2.0, as
>>> follows:
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>> 
>>> The complete staging area is available for your review, which includes:
>>> * JIRA release notes [1],
>>> * the official Apache source release to be deployed to dist.apache.org
>>> [2],
>>> which are signed with the key with fingerprint
>>> 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3],
>>> * all artifacts to be deployed to the Maven Central Repository [4],
>>> * source code tag v4.2.0-rc1 [5],
>>> * website pull request listing the new release [6].
>>> * A link to the CI run on the release tag [7]
>>> 
>>> The vote will be open for at least 72 hours. It is adopted by majority
>>> approval, with at least 3 PMC affirmative votes.
>>> 
>>> Thanks,
>>> Danny
>>> 
>>> [1]
>>> 
>>> 
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353011
>>> [2]
>>> 
>> https://dist.apache.org/repos/dist/dev/flink/flink-connector-aws-4.2.0-rc1
>>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>>> [4]
>>> https://repository.apache.org/content/repositories/orgapacheflink-1665/
>>> [5]
>> https://github.com/apache/flink-connector-aws/releases/tag/v4.2.0-rc1
>>> [6] https://github.com/apache/flink-web/pull/693
>>> [7]
>> https://github.com/apache/flink-connector-aws/actions/runs/6707962074
>>> 
>> 



Re: [VOTE] Release flink-connector-aws, v4.2.0 release candidate #1

2023-11-20 Thread Danny Cranmer
Hello all,

+1 (binding).

- Release notes look good
- Signatures and checksums match
- There are no binaries in the source archive
- pom versions are correct
- Tag is present in Github
- CI passes against FLink 1.17 and 1.18
- Source build and tests pass

Thanks,
Danny

On Wed, Nov 1, 2023 at 1:15 AM mystic lama  wrote:

> +1 (non-binding)
>
> - validated shasum
> - verified build
>- Java 8   - build good and all test cases pass
>- Java 11 - build good and all test cases pass
>
> Observations: got test failures with Java 17, something to look for in
> future
>
> On Tue, 31 Oct 2023 at 08:42, Danny Cranmer 
> wrote:
>
> > Hi everyone,
> >
> > Please review and vote on release candidate #1 for the version 4.2.0, as
> > follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release to be deployed to dist.apache.org
> > [2],
> > which are signed with the key with fingerprint
> > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag v4.2.0-rc1 [5],
> > * website pull request listing the new release [6].
> > * A link to the CI run on the release tag [7]
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Danny
> >
> > [1]
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353011
> > [2]
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-aws-4.2.0-rc1
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1665/
> > [5]
> https://github.com/apache/flink-connector-aws/releases/tag/v4.2.0-rc1
> > [6] https://github.com/apache/flink-web/pull/693
> > [7]
> https://github.com/apache/flink-connector-aws/actions/runs/6707962074
> >
>


[jira] [Created] (FLINK-33597) Can not use a nested column for a join condition

2023-11-20 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-33597:


 Summary: Can not use a nested column for a join condition
 Key: FLINK-33597
 URL: https://issues.apache.org/jira/browse/FLINK-33597
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.19.0


Query:
{code}
SELECT A.after.CUSTOMER_ID FROM `CUSTOMERS` A INNER JOIN `PRODUCTS` B ON 
A.after.CUSTOMER_ID = B.after.PURCHASER;
{code}

fails with:

{code}
java.lang.RuntimeException: Error while applying rule 
FlinkProjectWatermarkAssignerTransposeRule, args 
[rel#411017:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=RelSubset#411016,exprs=[$2, $2.CUSTOMER_ID]), 
rel#411015:LogicalWatermarkAssigner.NONE.any.None: 
0.[NONE].[NONE](input=RelSubset#411014,rowtime=$rowtime,watermark=SOURCE_WATERMARK())]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
...
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(INTEGER NOT NULL CUSTOMER_ID, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CUSTOMER_NAME, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" TITLE, INTEGER DOB) after, INTEGER $f8) NOT NULL
equiv rowtype: RecordType(RecordType:peek_no_expand(INTEGER NOT NULL 
CUSTOMER_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CUSTOMER_NAME, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" TITLE, INTEGER DOB) after, INTEGER 
NOT NULL $f8) NOT NULL
Difference:
$f8: INTEGER -> INTEGER NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 50 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33596) Support fold expression before transfer to RexNode

2023-11-20 Thread yunfan (Jira)
yunfan created FLINK-33596:
--

 Summary: Support fold expression before transfer to RexNode
 Key: FLINK-33596
 URL: https://issues.apache.org/jira/browse/FLINK-33596
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: yunfan


Hive will fold expression in optimization stage. But flink-hive-parser use 
flink optimization.

And flink can't know some hive function can be constant value.

It can be reproduce by follow sql
{code:java}
select distinct(dep), UNIX_TIMESTAMP()  from employee {code}
Some hive code reference:

https://github.com/apache/hive/blob/rel/release-2.3.9/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRexExecutorImpl.java#L62

https://github.com/apache/hive/blob/rel/release-2.3.9/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java#L1776

https://github.com/apache/hive/blob/rel/release-2.3.9/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java#L1069



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-395: Deprecate Global Aggregator Manager

2023-11-20 Thread Zhanghao Chen
Hi all,

I'd like to start a discussion of FLIP-395: Deprecate Global Aggregator Manager 
[1].

Global Aggregate Manager was introduced in [2] to support event time 
synchronization across sources and more generally, coordination of parallel 
tasks. AFAIK, this was only used in the Kinesis source for an early version of 
watermark alignment. Operator Coordinator, introduced in FLIP-27, provides a 
more powerful and elegant solution for that need and is part of the new source 
API standard. FLIP-217 further provides a complete solution for watermark 
alignment of source splits on top of the Operator Coordinator mechanism. 
Furthermore, Global Aggregate Manager manages state in JobMaster object, 
causing problems for adaptive parallelism changes [3].

Therefore, I propose to deprecate the use of Global Aggregate Manager, which 
can improve the maintainability of the Flink codebase without compromising its 
functionality.

Looking forward to your feedbacks, thanks.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-395%3A+Deprecate+Global+Aggregator+Manager
[2] https://issues.apache.org/jira/browse/FLINK-10886
[3] https://issues.apache.org/jira/browse/FLINK-31245

Best,
Zhanghao Chen


Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-20 Thread Xia Sun
Thanks Leonard for the detailed feedback and input.

> The 'Max source parallelism’ is the information that runtime offered to
Source as a hint to infer the actual parallelism, a name with max prefix
but calculated > with minimum value confusing me a lot, especially when I
read the HiveSource pseudocode:

> fileEnumerator.setMinNumSplits(maxSourceParallelism);

> Although I understand that naming is a complex topic in CS, could we
improve this method name a little?

Thanks for pointing out the issue, the current wording does indeed seem to
be confusing. It involves the existing implementation of the
AdaptiveBatchScheduler, where the dynamically inferred parallelism cannot
exceed the JobVertex's maxParallelism (which is typically set to either the
global default max parallelism or the user-specified JobVertex max
parallelism), so the flip maintains the logic. I have modified the flip to
avoid confusion as much as possible.

> And,
`execution.batch.adaptive.auto-parallelism.default-source-parallelism`
config has a default value > with 1, how we distinguish user set it or not
for

> example if user happen to set value 1 ?

We can use Configuration::getOptional to check if the user has configured
the `execution.batch.adaptive.auto-parallelism.default-source-parallelism`.

> No doubt that it’s a API breaking change, for existing hive users, the
migration path is not clear in this FLIP, for example, current users used
splits number

> to infer the source parallelism, after this FLIP, could we give the
recommended value of `execution.batch.adaptive.auto-parallelism.

> default-source-parallelism` or how to set it or event users do not need
to set anythins? And the replacement for migration replacement should add
to 'table.

> exec.hive.infer-source-parallelism’s description when we propose to
change its default value, right?

As a follow-up task, we may have a dedicated discussion in the future to
see if we need to change the default value of
`table.exec.hive.infer-source-parallelism` to false. Before then, user can
manually set `table.exec.hive.infer-source-parallelism` to false to enable
dynamic parallelism inference, and use
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` to
replace `table.exec.hive.infer-source-parallelism.max` as the parallelism
inference upper bound. I have updated both the Flip's
DynamicParallelismInference interface implementation and Migration Plan
modules to illustrate this.

> The pseudocode code shows:

> fileEnumerator.getInferredSourceParallelsim();

> IIRC, our public API FileEnumerator never offers such method, introducing
getInferredSourceParallelsim() is also one part of our FLIP ?

The intent was to make the pseudo code easier to understand, but did
introduce some confusion. There are no plans to introduce
getInferredSourceParallelism() in HiveSource, and I've modified the
HiveSource pseudo code in flip.

Best regards,

Xia

Leonard Xu  于2023年11月16日周四 12:36写道:

> Thanks Xia for the detailed reply.
>
> >> `How user disable the parallelism inference if they want to use fixed
> source parallelism?`
> >> `Could you explain the priority the static parallelism set from table
> layer and the proposed dynamic source parallelism?`
> >
> > From the user's perspective, if the user specifies a fixed parallelism
> for
> > the source, dynamic source parallelism inference will be automatically
> > disabled. From the perspective of priority, the user’s specified
> > parallelism > the static parallelism inference > dynamic parallelism
> > inference. Because the dynamic source parallelism inference will take
> > effect at the runtime stage and the validity conditions are: (1) the
> > current ExecutionGraph is a dynamic graph, and (2) the parallelism of the
> > source vertex is not specified (that is, the parallelism is -1).
>
> The priority explanation make sense to me, could you also add this
> explanation to FLIP?
>
> >> `So, could we consider the boundness info when design the interface?
> Both
> > FileSource and Hive Source offer streaming read ability, imaging this
> case:
> > Flink Streaming Hive Source should not apply the dynamic source
> parallelism
> > even it implemented the feature as it severing a streaming job.`
> >
> > Thanks for your feedback, it is reallly a good input. Currently, the
> > dynamic parallelism inference logic is only triggered in batch jobs.
> > Therefore, the logic will not be called in the streaming jobs.
> > In the future, if streaming jobs also support runtime parallelism
> > inference, then theoretically, the source can no longer be distinguished
> > between streaming jobs and batch jobs at the runtime stage. In addition,
> > since the new interface is implemented together with the Source
> interface,
> > the Source::getBoundedness() method can also be obtained when inferring
> > parallelism.
>
> +1 about the boundedness info part.
>
> Okay, let’s come back the batch world and discuss some details about
> current design:
>
> (1) About  

Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-20 Thread Piotr Nowojski
Hi Jing!

>  the upcoming OpenTelemetry based TraceReporter will use the same Span
> implementation and will not support trace_id and span_id. Does it make
> sense to at least add the span_id into the current Span design? The
default
> implementation could follow your suggestion: jobId#attemptId#checkpointId.

Those IDs (jobId, checkpointId) will be accessible to the humans via
attributes,
so there is no need to encode them at the moment in the span/trace ids. At
the
same time, at the moment, I don't know for sure how the concept of span
parent ids should be exposed to the user of this API. Whether it should be
plain
text, or some pojo generating the trace id/span id. Also I'm not sure how
would
this have to work for other reporting systems other than OTEL. Due to those
reasons I thought that keeping the API as simple as possible would be the
best
option.

> 1. The sample code shows that the scope of Span will be the CanonicalName
> of a class. If there are other cases that could be used as the scope too,
a
> javadoc about Span scope would be helpful. If the CanonicalName of a class
> is the best practice, removing the scope from the builder constructor and
> adding setScope(Class) might ease the API usage. The Span.getScope() can
> still return String.

I like the idea with `#setScope(Class)`. I will adjust the FLIP :)

> 2. The sample code in the FLIP is not consistent. The first example used
> Span.builder(..) and the second example used new Span() with setters.

I will fix that, I've forgotten to upgrade the second `new Span()` usage to
the
builder.

> 3. I guess the constructor of SpanBuilder class is a typo.

Yes! Thanks for noting.

Best,
Piotrek


czw., 16 lis 2023 o 15:12 Roman Khachatryan  napisał(a):

> Thanks for the proposal,
>
> Starting with the minimal functionality and expanding if necessary as the
> FLIP describes makes a lot of sense to me.
>
> Regards,
> Roman
>
> On Wed, Nov 15, 2023, 9:31 PM Jing Ge  wrote:
>
> > Hi Piotr,
> >
> > Sorry for the late reply and thanks for the proposal, it looks awesome!
> >
> > In the discussion, you pointed out that it is difficult to build true
> > distributed traces. afaiu from FLIP-384 and FLIP-385, the
> > upcoming OpenTelemetry based TraceReporter will use the same Span
> > implementation and will not support trace_id and span_id. Does it make
> > sense to at least add the span_id into the current Span design? The
> default
> > implementation could follow your suggestion:
> jobId#attemptId#checkpointId.
> >
> > Some other NIT questions:
> > 1. The sample code shows that the scope of Span will be the CanonicalName
> > of a class. If there are other cases that could be used as the scope
> too, a
> > javadoc about Span scope would be helpful. If the CanonicalName of a
> class
> > is the best practice, removing the scope from the builder constructor and
> > adding setScope(Class) might ease the API usage. The Span.getScope() can
> > still return String.
> > 2. The sample code in the FLIP is not consistent. The first example used
> > Span.builder(..) and the second example used new Span() with setters.
> > 3. I guess the constructor of SpanBuilder class is a typo.
> >
> > Really a nice idea to introduce the trace report! Thanks again!
> >
> > Best regards,
> > Jing
> >
> > On Tue, Nov 14, 2023 at 3:16 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi All,
> > >
> > > Thanks for the answers!
> > >
> > > Unless there are some objections or suggestions, I will open a voting
> > > thread later this
> > > week.
> > >
> > > > My original thought was to show how much time a sampled record is
> > > processed
> > > > within each operator in stream processing. By saying 'sampled', I
> mean
> > we
> > > > won't generate a trace for every record due to the high cost
> involved.
> > > > Instead, we could only trace ONE record from source when the user
> > > requests
> > > > it (via REST API or Web UI) or when triggered periodically at a very
> > low
> > > > frequency.
> > >
> > > That would be useful, but another issue is that we can not measure time
> > > reliably at the
> > > granularity of a single record. Time to process a single record by the
> > > whole operator
> > > chain is usually faster compared to the syscalls to measure time.
> > >
> > > So I think we are stuck with sample based profilers, like Flame Graphs
> > > generated by
> > > the Flink WebUI.
> > >
> > > Best, Piotrek
> > >
> > > czw., 9 lis 2023 o 05:32 Rui Fan <1996fan...@gmail.com> napisał(a):
> > >
> > > > Hi Piotr:
> > > >
> > > > Thanks for your reply!
> > > >
> > > > > About structured logging (basically events?) I vaguely remember
> some
> > > > > discussions about that. It might be a much larger topic, so I would
> > > > prefer
> > > > > to leave it out of the scope of this FLIP.
> > > >
> > > > Sounds make sense to me!
> > > >
> > > > > I think those could be indeed useful. If you would like to
> contribute
> > > to
> > > > them
> > > > > in the future, I would be happy to review the FLIP for 

Re: [DISCUSS] FLIP-386: Support adding custom metrics in Recovery Spans

2023-11-20 Thread Piotr Nowojski
Hi Roman!

> 1. why the existing MetricGroup interface can't be used? It already had
> methods to add metrics and spans ...

That's because of the need to:
a) associate the spans to specifically Job's initialisation
b) we need to logically aggregate the span's attributes across subtasks.

`MetricGroup` doesn't have such capabilities and it's too generic an
interface to introduce things like that IMO.

Additionally for metrics:
c) reporting initialization measurements as metrics is a flawed concept as
described in the FLIP's-384 motivation
Additionally for spans:
d) as discussed in the FLIP's-384 thread, we don't want to report separate
spans on the TMs. At least not right now

Also having a specialized, dedicated for initialization metrics class to
collect those numbers, makes the interfaces
more lean and more specialized.

> 2. IIUC, based on these numbers, we're going to report span(s). Shouldn't
> the backend report them as spans?

As discussed in the FLIP's-384, initially we don't want to report spans on
TMs. Later, optionally reporting
individual subtask's checkpoint/recovery spans on the JM looks like a
logical follow up.

> 3. How is the implementation supposed to infer that some metric is a part
> of initialization (and make the corresponding RPC to JM?). Should the
> interfaces be more explicit about that?

This FLIP proposes [1] to add
`CustomInitializationMetrics
KeyedStateBackendParameters#getCustomInitializationMetrics()`
accessor to the `KeyedStateBackendParameters` argument that's passed to
`createKeyedStateBackend(...)`
method. That's pretty explicit I would say :)

> 4. What do you think about using histogram or percentiles instead of
> min/max/sum/avg? That would be more informative

I would prefer to start with the simplest min/max/sum/avg, and let's see in
which direction (if any) we need to evolve
that. Alternative to percentiles is previously mentioned to report
separately each subtask's initialisation/checkpointing span.

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans#FLIP386:SupportaddingcustommetricsinRecoverySpans-PublicInterfaces

czw., 16 lis 2023 o 15:45 Roman Khachatryan  napisał(a):

> Thanks for the proposal,
>
> Can you please explain:
> 1. why the existing MetricGroup interface can't be used? It already had
> methods to add metrics and spans ...
>
> 2. IIUC, based on these numbers, we're going to report span(s). Shouldn't
> the backend report them as spans?
>
> 3. How is the implementation supposed to infer that some metric is a part
> of initialization (and make the corresponding RPC to JM?). Should the
> interfaces be more explicit about that?
>
> 4. What do you think about using histogram or percentiles instead of
> min/max/sum/avg? That would be more informative
>
> I like the idea of introducing parameter objects for backend creation.
>
> Regards,
> Roman
>
> On Tue, Nov 7, 2023, 1:20 PM Piotr Nowojski  wrote:
>
> > (Fixing topic)
> >
> > wt., 7 lis 2023 o 09:40 Piotr Nowojski 
> napisał(a):
> >
> > > Hi all!
> > >
> > > I would like to start a discussion on a follow up of FLIP-384:
> Introduce
> > > TraceReporter and use it to create checkpointing and recovery traces
> [1]:
> > >
> > > *FLIP-386: Support adding custom metrics in Recovery Spans [2]*
> > >
> > > This FLIP adds a functionality that will allow state backends to attach
> > > custom metrics to the recovery/initialization traces. This requires
> > changes
> > > to the `@PublicEvolving` `StateBackend` API, and it will be initially
> > used
> > > in `RocksDBIncrementalRestoreOperation` to measure how long does it
> take
> > to
> > > download remote files and separately how long does it take to load
> those
> > > files into the local RocksDB instance.
> > >
> > > Please let me know what you think!
> > >
> > > Best,
> > > Piotr Nowojski
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-20 Thread Jing Ge
Hi Junrui,

Thanks for the clarification. On one hand, adding more methods into the
RuntimeContext flat will increase the effort for users who will use
RuntimeContext. But the impact is limited. It is fine. The big impact is,
on the other hand, for users who want to focus on the execution config,
they will need to find the needle in the haystack.

I just shared my thoughts and tried to help you look at the issue from many
different angles and I am open to learning opinions from other
contributors. Please feel free to proceed if there are no other objections.

Best regards,
Jing

On Mon, Nov 20, 2023 at 6:50 AM Junrui Lee  wrote:

> Hi Jing,
>
> Thank you for your feedback. I understand your concerns regarding putting
> all methods into the RuntimeContext flat.
>
> I would like to share some of my thoughts on this matter.
> Firstly, this FLIP only proposes the addition of three additional methods,
> which should not impose too much extra burden on users. Secondly, I agree
> that it is important to make it clearer for users to use the
> RuntimeContext. However, reorganizing the RuntimeContext to achieve this
> requires further discussion. We should focus on a more specific and unified
> reorganization of the RuntimeContext interface in future work, rather than
> implementing a temporary solution now. Therefore, I prefer not to add a
> separate abstraction layer for these three methods in this FLIP.
>
> Please feel free to share any further thoughts.
>
> Best regards,
> Junrui
>
> Jing Ge  于2023年11月20日周一 05:46写道:
>
>> Hi Junrui,
>>
>> Thanks for bringing this to our attention. First of all, it makes sense
>> to deprecate RuntimeContext#getExecutionConfig.
>>
>> Afaic, this is an issue of how we design API with clean concepts/aspects.
>> There are two issues mentioned in the FLIP:
>>
>> 1. short of user-facing abstraction - we just exposed ExecutionConfig
>> which mixed methods for users with methods that should only be used
>> internally.
>> 2. mutable vs immutable - do we want users to be able to modify configs
>> during job execution?
>>
>> An immutable user-facing abstraction design can solve both issues. All
>> execution related configs are still consolidated into the abstraction class
>> and easy to access. This is another design decision: flat vs. hierarchical.
>> Current FLIP removed the execution config abstraction and put all methods
>> into RuntimeContext flat, which will end up with more than 30 methods
>> offered flat by the RuntimeContext. I am not sure if this could help users
>> find the right method in the context of execution config better than
>> before.
>>
>> I might miss something and look forward to your thoughts. Thanks!
>>
>> Best regards,
>> Jing
>>
>> On Sat, Nov 18, 2023 at 11:21 AM Junrui Lee  wrote:
>>
>>> Hello Wencong,
>>>
>>> Thank you for your valuable feedback and suggestions. I want to clarify
>>> that reviewing existing methods in the ExecutionConfig is not directly
>>> related to the proposal in this FLIP. The main focus of this FLIP is to
>>> deprecate the specific method RuntimeContext#getExecutionConfig(). I
>>> believe it is important to keep the scope of this FLIP limited. However,
>>> your suggestion can certainly be considered as a separate FLIP in the
>>> future.
>>>
>>> Best regards,
>>> Junrui
>>>
>>> Wencong Liu  于2023年11月17日周五 22:08写道:
>>>
 Hello Junrui,


 Thanks for the effort. I agree with the proposal to deprecate the
 getExecutionConfig() method in the RuntimeContext class. Exposing
 the complex ExecutionConfig to user-defined functions can lead to
 unnecessary complexity and risks.


 I also have a suggestion. We could consider reviewing the existing
  methods in ExecutionConfig. If there are methods that are defined
  in ExecutionConfig but currently have no callers, we could consider
  annotating  them as @Internal or directly removing them. Since
 users are no longer able to access and invoke these methods,
 it would be beneficial to clean up the codebase.


 +1 (non-binding).


 Best,
 Wencong



















 At 2023-11-15 16:51:15, "Junrui Lee"  wrote:
 >Hi all,
 >
 >I'd like to start a discussion of FLIP-391: Deprecate
 >RuntimeContext#getExecutionConfig[1].
 >
 >Currently, the FLINK RuntimeContext is important for connecting user
 >functions to the underlying runtime details. It provides users with
 >necessary runtime information during job execution.
 >However, he current implementation of the FLINK RuntimeContext exposes
 the
 >ExecutionConfig to users, resulting in two issues:
 >Firstly, the ExecutionConfig contains much unrelated information that
 can
 >confuse users and complicate management.
 >Secondly, exposing the ExecutionConfig allows users to modify it
 during job
 >execution, which can cause