Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-23 Thread Matthias Pohl
Sounds good. Two points I want to add:

   - Listener execution should be independent — however we need a way to
> enforce a Label key/key-prefix is only assigned to a single Listener,
> thinking of a validation step both at Listener init and runtime stages
>
What about adding an extra method getNamespace() to the Listener interface
which returns an Optional. Therefore, the implementation/the user
can decide depending on the use case whether it's necessary to have
separate namespaces for the key/value pairs or not. On the Flink side, we
would just merge the different maps considering their namespaces.

A flaw of this approach is that if a user decides to use the same namespace
for multiple listeners, how is an error in one of the listeners represented
in the outcome? We would have to overwrite either the successful listener's
result or the failed ones. I wanted to share it, anyway.

One additional remark on introducing it as an async operation: We would
need a new configuration parameter to define the timeout for such a
listener call, wouldn't we?

Matthias

On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis 
wrote:

> Hi everyone,
>
>
> Thanks for the valuable comments!
> Excited to see this is an area of interest for the community!
>
> Summarizing some of the main points raised along with my thoughts:
>
>- Labels (Key/Value) pairs are more expressive than Tags (Strings) so
>using the former is a good idea — I am also debating if we want to
> return
>multiple KV pairs per Listener (one could argue that we could split the
>logic in multiple Listeners to support that)
>- An immutable context along with data returned using the interface
>method implementations is a better approach than a mutable Collection
>- Listener execution should be independent — however we need a way to
>enforce a Label key/key-prefix is only assigned to a single Listener,
>thinking of a validation step both at Listener init and runtime stages
>- We want to perform async Listener operations as sync could block the
>main thread — exposing an ioExecutor pool through the context could be
> an
>elegant solution here
>- Make sure Listener errors are not failing jobs — make sure to log and
>keep the job alive
>- We need better naming / public interface separation/description
>
> -  Even though custom restart strategies share some properties with
> Listeners, they would probably need a separate interface with a different
> return type anyway (restart strategy not labels) and in general they are
> different and complex enough to justify their own FLIP (that can also be a
> follow-up).
>
>
> What do people think? I am planning to modify the FLIP to reflect these
> changes if they make sense to everyone.
>
> Cheers,
> Panagiotis
>
> On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh  wrote:
>
> > Hi all,
> >
> > Thank you Panagiotis for proposing this. From the size of the thread,
> this
> > is a much needed feature in Flink!
> > Some thoughts, to extend those already adeptly summarised by Piotr,
> > Matthias and Jing.
> >
> > - scope of FLIP: +1 to scoping this FLIP to observability around a
> > restart. That would include adding metadata + exposing metadata to
> external
> > systems. IMO, introducing a new restart strategy solves different
> problems,
> > is much larger scope and should be covered in a separate FLIP.
> >
> > - failure handling: At the moment, we propose transitioning the Flink job
> > to a terminal FAILED state when JobListener fails, when the job could
> have
> > transitioned to RESTARTING->RUNNING. If we are keeping in line with the
> > scope to add metadata/observability around job restarts, we should not be
> > affecting the running of the Flink job itself. Could I propose we instead
> > log WARN/ERROR.
> >
> > - immutable context: +1 to keeping the contract clear via return types.
> > - async operation: +1 to adding ioexecutor to context, however, given we
> > don’t want to block the actual job restart on adding metadata / calling
> > external services, should we consider returning and letting futures
> > complete independently?
> >
> > - independent vs ordered execution: Should we consider making the order
> of
> > execution deterministic (use a List instead of Set)?
> >
> >
> > Once again, thank you for working on this.
> >
> > Regards,
> > Hong
> >
> >
> > > On 21 Mar 2023, at 21:07, Jing Ge  wrote:
> > >
> > > Hi,
> > >
> > > Thanks Panagiotis for this FLIP and thanks for all valuable
> discussions.
> > > I'd like to share my two cents:
> > >
> > > - FailureListenerContext#addTag and FailureListenerContext#getTags. It
> > > seems that we have to call getTags() and then do remove activities if
> we
> > > want to delete any tags (according to the javadoc in the FLIP).  It is
> > > inconsistent for me too. Either offer addTag(), deleteTag(), and let
> > > getTags() return immutable collection, or offer getTags() only to
> return
> > > mutable collection.
> > 

Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-23 Thread David Morávek
>
> One additional remark on introducing it as an async operation: We would
> need a new configuration parameter to define the timeout for such a
> listener call, wouldn't we?
>

This could be left up to the implementor to handle.

What about adding an extra method getNamespace() to the Listener interface
> which returns an Optional.
>

I'd avoid mixing an additional concept into this. We can simply have a new
method that returns a set of keys the listener can output. We can validate
this at the JM startup time and fail fast (since it's a configuration
error) if there is an overlap. If the listener outputs the key that is not
allowed to, I wouldn't be afraid to call into a fatal error handler since
it's an invalid implementation.

Best,
D.

On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl
 wrote:

> Sounds good. Two points I want to add:
>
>- Listener execution should be independent — however we need a way to
> > enforce a Label key/key-prefix is only assigned to a single Listener,
> > thinking of a validation step both at Listener init and runtime stages
> >
> What about adding an extra method getNamespace() to the Listener interface
> which returns an Optional. Therefore, the implementation/the user
> can decide depending on the use case whether it's necessary to have
> separate namespaces for the key/value pairs or not. On the Flink side, we
> would just merge the different maps considering their namespaces.
>
> A flaw of this approach is that if a user decides to use the same namespace
> for multiple listeners, how is an error in one of the listeners represented
> in the outcome? We would have to overwrite either the successful listener's
> result or the failed ones. I wanted to share it, anyway.
>
> One additional remark on introducing it as an async operation: We would
> need a new configuration parameter to define the timeout for such a
> listener call, wouldn't we?
>
> Matthias
>
> On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis 
> wrote:
>
> > Hi everyone,
> >
> >
> > Thanks for the valuable comments!
> > Excited to see this is an area of interest for the community!
> >
> > Summarizing some of the main points raised along with my thoughts:
> >
> >- Labels (Key/Value) pairs are more expressive than Tags (Strings) so
> >using the former is a good idea — I am also debating if we want to
> > return
> >multiple KV pairs per Listener (one could argue that we could split
> the
> >logic in multiple Listeners to support that)
> >- An immutable context along with data returned using the interface
> >method implementations is a better approach than a mutable Collection
> >- Listener execution should be independent — however we need a way to
> >enforce a Label key/key-prefix is only assigned to a single Listener,
> >thinking of a validation step both at Listener init and runtime stages
> >- We want to perform async Listener operations as sync could block the
> >main thread — exposing an ioExecutor pool through the context could be
> > an
> >elegant solution here
> >- Make sure Listener errors are not failing jobs — make sure to log
> and
> >keep the job alive
> >- We need better naming / public interface separation/description
> >
> > -  Even though custom restart strategies share some properties
> with
> > Listeners, they would probably need a separate interface with a different
> > return type anyway (restart strategy not labels) and in general they are
> > different and complex enough to justify their own FLIP (that can also be
> a
> > follow-up).
> >
> >
> > What do people think? I am planning to modify the FLIP to reflect these
> > changes if they make sense to everyone.
> >
> > Cheers,
> > Panagiotis
> >
> > On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh  wrote:
> >
> > > Hi all,
> > >
> > > Thank you Panagiotis for proposing this. From the size of the thread,
> > this
> > > is a much needed feature in Flink!
> > > Some thoughts, to extend those already adeptly summarised by Piotr,
> > > Matthias and Jing.
> > >
> > > - scope of FLIP: +1 to scoping this FLIP to observability around a
> > > restart. That would include adding metadata + exposing metadata to
> > external
> > > systems. IMO, introducing a new restart strategy solves different
> > problems,
> > > is much larger scope and should be covered in a separate FLIP.
> > >
> > > - failure handling: At the moment, we propose transitioning the Flink
> job
> > > to a terminal FAILED state when JobListener fails, when the job could
> > have
> > > transitioned to RESTARTING->RUNNING. If we are keeping in line with the
> > > scope to add metadata/observability around job restarts, we should not
> be
> > > affecting the running of the Flink job itself. Could I propose we
> instead
> > > log WARN/ERROR.
> > >
> > > - immutable context: +1 to keeping the contract clear via return types.
> > > - async operation: +1 to adding ioexecutor to context, however, given
> we
> > > don’t want to blo

Re: [VOTE] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-23 Thread Yuxin Tan
Hi, everyone

I'm closing this vote now. I will follow up with the result in another
email. Thanks

Best,
Yuxin


Yun Tang  于2023年3月21日周二 10:58写道:

> +1 (binding)
>
> Best
> Yun Tang
> 
> From: Zhu Zhu 
> Sent: Tuesday, March 21, 2023 10:07
> To: dev@flink.apache.org 
> Subject: Re: [VOTE] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> +1 (binding)
>
> Thanks,
> Zhu
>
> Xintong Song  于2023年3月20日周一 16:26写道:
> >
> > +1 (binding)
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Mon, Mar 20, 2023 at 4:07 PM weijie guo 
> > wrote:
> >
> > > Thanks Yuxin for driving this.
> > >
> > > +1 (binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Junrui Lee  于2023年3月20日周一 14:57写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > > Best regards,
> > > > Junrui
> > > >
> > > > Weihua Hu  于2023年3月20日周一 14:24写道:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Weihua
> > > > >
> > > > >
> > > > > On Mon, Mar 20, 2023 at 12:39 PM Wencong Liu  >
> > > > wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Wencong Liu
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > At 2023-03-20 12:05:47, "Yuxin Tan" 
> wrote:
> > > > > > >Hi, everyone,
> > > > > > >
> > > > > > >Thanks for all your feedback for FLIP-301: Hybrid Shuffle
> > > > > > >supports Remote Storage[1] on the discussion thread[2].
> > > > > > >
> > > > > > >I'd like to start a vote for it. The vote will be open for at
> > > > > > >least 72 hours (03/23, 13:00 UTC+8) unless there is an
> > > > > > >objection or not enough votes.
> > > > > > >
> > > > > > >[1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
> > > > > > >[2]
> > > https://lists.apache.org/thread/nwrqd5jtqwks89tbxpcrgto6r2bhdhno
> > > > > > >
> > > > > > >Best,
> > > > > > >Yuxin
> > > > > >
> > > > >
> > > >
> > >
>


Re: [DISCUSS] FLIP-304: Pluggable failure handling for Apache Flink

2023-03-23 Thread Zhu Zhu
+1 to support custom restart strategies in a different FLIP.

It's fine to have a different plugin for custom restart strategy.
If so, since we do not treat the FLIP-304 plugin as a common failure
handler, but instead mainly targets to add labels to errors, I would
+1 for the name `FailureEnricher`.

Thanks,
Zhu

David Morávek  于2023年3月23日周四 15:51写道:
>
> >
> > One additional remark on introducing it as an async operation: We would
> > need a new configuration parameter to define the timeout for such a
> > listener call, wouldn't we?
> >
>
> This could be left up to the implementor to handle.
>
> What about adding an extra method getNamespace() to the Listener interface
> > which returns an Optional.
> >
>
> I'd avoid mixing an additional concept into this. We can simply have a new
> method that returns a set of keys the listener can output. We can validate
> this at the JM startup time and fail fast (since it's a configuration
> error) if there is an overlap. If the listener outputs the key that is not
> allowed to, I wouldn't be afraid to call into a fatal error handler since
> it's an invalid implementation.
>
> Best,
> D.
>
> On Thu, Mar 23, 2023 at 8:34 AM Matthias Pohl
>  wrote:
>
> > Sounds good. Two points I want to add:
> >
> >- Listener execution should be independent — however we need a way to
> > > enforce a Label key/key-prefix is only assigned to a single Listener,
> > > thinking of a validation step both at Listener init and runtime stages
> > >
> > What about adding an extra method getNamespace() to the Listener interface
> > which returns an Optional. Therefore, the implementation/the user
> > can decide depending on the use case whether it's necessary to have
> > separate namespaces for the key/value pairs or not. On the Flink side, we
> > would just merge the different maps considering their namespaces.
> >
> > A flaw of this approach is that if a user decides to use the same namespace
> > for multiple listeners, how is an error in one of the listeners represented
> > in the outcome? We would have to overwrite either the successful listener's
> > result or the failed ones. I wanted to share it, anyway.
> >
> > One additional remark on introducing it as an async operation: We would
> > need a new configuration parameter to define the timeout for such a
> > listener call, wouldn't we?
> >
> > Matthias
> >
> > On Wed, Mar 22, 2023 at 4:56 PM Panagiotis Garefalakis 
> > wrote:
> >
> > > Hi everyone,
> > >
> > >
> > > Thanks for the valuable comments!
> > > Excited to see this is an area of interest for the community!
> > >
> > > Summarizing some of the main points raised along with my thoughts:
> > >
> > >- Labels (Key/Value) pairs are more expressive than Tags (Strings) so
> > >using the former is a good idea — I am also debating if we want to
> > > return
> > >multiple KV pairs per Listener (one could argue that we could split
> > the
> > >logic in multiple Listeners to support that)
> > >- An immutable context along with data returned using the interface
> > >method implementations is a better approach than a mutable Collection
> > >- Listener execution should be independent — however we need a way to
> > >enforce a Label key/key-prefix is only assigned to a single Listener,
> > >thinking of a validation step both at Listener init and runtime stages
> > >- We want to perform async Listener operations as sync could block the
> > >main thread — exposing an ioExecutor pool through the context could be
> > > an
> > >elegant solution here
> > >- Make sure Listener errors are not failing jobs — make sure to log
> > and
> > >keep the job alive
> > >- We need better naming / public interface separation/description
> > >
> > > -  Even though custom restart strategies share some properties
> > with
> > > Listeners, they would probably need a separate interface with a different
> > > return type anyway (restart strategy not labels) and in general they are
> > > different and complex enough to justify their own FLIP (that can also be
> > a
> > > follow-up).
> > >
> > >
> > > What do people think? I am planning to modify the FLIP to reflect these
> > > changes if they make sense to everyone.
> > >
> > > Cheers,
> > > Panagiotis
> > >
> > > On Wed, Mar 22, 2023 at 6:28 AM Hong Teoh  wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thank you Panagiotis for proposing this. From the size of the thread,
> > > this
> > > > is a much needed feature in Flink!
> > > > Some thoughts, to extend those already adeptly summarised by Piotr,
> > > > Matthias and Jing.
> > > >
> > > > - scope of FLIP: +1 to scoping this FLIP to observability around a
> > > > restart. That would include adding metadata + exposing metadata to
> > > external
> > > > systems. IMO, introducing a new restart strategy solves different
> > > problems,
> > > > is much larger scope and should be covered in a separate FLIP.
> > > >
> > > > - failure handling: At the moment, we pr

[ANNOUNCE] Apache Flink 1.17.0 released

2023-03-23 Thread Leonard Xu
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.17.0, which is the first release for the Apache Flink 1.17 series.

Apache Flink® is an open-source unified stream and batch data processing 
framework for distributed, high-performing, always-available, and accurate data 
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this release:
https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
 

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Best regards,
Qingsheng, Martijn, Matthias and Leonard

[jira] [Created] (FLINK-31577) Upgrade documentation to improve Operator

2023-03-23 Thread Jeff Yang (Jira)
Jeff Yang created FLINK-31577:
-

 Summary: Upgrade documentation to improve Operator
 Key: FLINK-31577
 URL: https://issues.apache.org/jira/browse/FLINK-31577
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.4.0, kubernetes-operator-1.3.0, 
kubernetes-operator-1.5.0
Reporter: Jeff Yang
 Fix For: kubernetes-operator-1.5.0


Currently, when I refer to the documentation for operator upgrade, I found that 
there are some descriptions in the documentation that need to be improved, 
otherwise the upgrade may fail due to misuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31578) Build Release Candidate: 1.17.0-rc2

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31578:
-

 Summary: Build Release Candidate: 1.17.0-rc2
 Key: FLINK-31578
 URL: https://issues.apache.org/jira/browse/FLINK-31578
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren
 Fix For: 1.17.0


The core of the release process is the build-vote-fix cycle. Each cycle 
produces one release candidate. The Release Manager repeats this cycle until 
the community approves one release candidate, which is then finalized.

h4. Prerequisites
Set up a few environment variables to simplify Maven commands that follow. This 
identifies the release candidate being built. Start with {{RC_NUM}} equal to 1 
and increment it for each candidate:
{code}
RC_NUM="1"
TAG="release-${RELEASE_VERSION}-rc${RC_NUM}"
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31579) CLONE - Build and stage Java and Python artifacts

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31579:
-

 Summary: CLONE - Build and stage Java and Python artifacts
 Key: FLINK-31579
 URL: https://issues.apache.org/jira/browse/FLINK-31579
 Project: Flink
  Issue Type: Sub-task
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


# Create a local release branch ((!) this step can not be skipped for minor 
releases):
{code:bash}
$ cd ./tools
tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION 
RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh
{code}
 # Tag the release commit:
{code:bash}
$ git tag -s ${TAG} -m "${TAG}"
{code}
 # We now need to do several things:
 ## Create the source release archive
 ## Deploy jar artefacts to the [Apache Nexus 
Repository|https://repository.apache.org/], which is the staging area for 
deploying the jars to Maven Central
 ## Build PyFlink wheel packages
You might want to create a directory on your local machine for collecting the 
various source and binary releases before uploading them. Creating the binary 
releases is a lengthy process but you can do this on another machine (for 
example, in the "cloud"). When doing this, you can skip signing the release 
files on the remote machine, download them to your local machine and sign them 
there.
 # Build the source release:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh
{code}
 # Stage the maven artifacts:
{code:bash}
tools $ releasing/deploy_staging_jars.sh
{code}
Review all staged artifacts ([https://repository.apache.org/]). They should 
contain all relevant parts for each module, including pom.xml, jar, test jar, 
source, test source, javadoc, etc. Carefully review any new artifacts.
 # Close the staging repository on Apache Nexus. When prompted for a 
description, enter “Apache Flink, version X, release candidate Y”.
Then, you need to build the PyFlink wheel packages (since 1.11):
 # Set up an azure pipeline in your own Azure account. You can refer to [Azure 
Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository]
 for more details on how to set up azure pipeline for a fork of the Flink 
repository. Note that a google cloud mirror in Europe is used for downloading 
maven artifacts, therefore it is recommended to set your [Azure organization 
region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location]
 to Europe to speed up the downloads.
 # Push the release candidate branch to your forked personal Flink repository, 
e.g.
{code:bash}
tools $ git push  
refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM}
{code}
 # Trigger the Azure Pipelines manually to build the PyFlink wheel packages
 ## Go to your Azure Pipelines Flink project → Pipelines
 ## Click the "New pipeline" button on the top right
 ## Select "GitHub" → your GitHub Flink repository → "Existing Azure Pipelines 
YAML file"
 ## Select your branch → Set path to "/azure-pipelines.yaml" → click on 
"Continue" → click on "Variables"
 ## Then click "New Variable" button, fill the name with "MODE", and the value 
with "release". Click "OK" to set the variable and the "Save" button to save 
the variables, then back on the "Review your pipeline" screen click "Run" to 
trigger the build.
 ## You should now see a build where only the "CI build (release)" is running
 # Download the PyFlink wheel packages from the build result page after the 
jobs of "build_wheels mac" and "build_wheels linux" have finished.
 ## Download the PyFlink wheel packages
 ### Open the build result page of the pipeline
 ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact)
 ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels 
linux}} separately to download the zip files
 ## Unzip these two zip files
{code:bash}
$ cd /path/to/downloaded_wheel_packages
$ unzip wheel_Linux_build_wheels\ linux.zip
$ unzip wheel_Darwin_build_wheels\ mac.zip{code}
 ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}:
{code:bash}
$ cd 
$ mkdir flink-python/dist{code}
 ## Move the unzipped wheel packages to the directory of 
{{{}flink-python/dist{}}}:
{code:java}
$ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/
$ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/
$ cd tools{code}

Finally, we create the binary convenience release files:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_binary_release.sh
{code}
If you want to run this step in parallel on a remote machine you have to make 
the release commit available there (for example by pushing to a repository). 
*This is important: the commit inside the binary builds has to match the commit 
of the source builds and the tagged release commit.* 
When building rem

[jira] [Created] (FLINK-31581) CLONE - Propose a pull request for website updates

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31581:
-

 Summary: CLONE - Propose a pull request for website updates
 Key: FLINK-31581
 URL: https://issues.apache.org/jira/browse/FLINK-31581
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren
 Fix For: 1.17.0


The final step of building the candidate is to propose a website pull request 
containing the following changes:
 # update 
[apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml]
 ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as 
required
 ## update version references in quickstarts ({{{}q/{}}} directory) as required
 ## (major only) add a new entry to {{flink_releases}} for the release binaries 
and sources
 ## (minor only) update the entry for the previous release in the series in 
{{flink_releases}}
 ### Please pay notice to the ids assigned to the download entries. They should 
be unique and reflect their corresponding version number.
 ## add a new entry to {{release_archive.flink}}
 # add a blog post announcing the release in _posts
 # add a organized release notes page under docs/content/release-notes and 
docs/content.zh/release-notes (like 
[https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]).
 The page is based on the non-empty release notes collected from the issues, 
and only the issues that affect existing users should be included (e.g., 
instead of new functionality). It should be in a separate PR since it would be 
merged to the flink project.

(!) Don’t merge the PRs before finalizing the release.

 

h3. Expectations
 * Website pull request proposed to list the 
[release|http://flink.apache.org/downloads.html]
 * (major only) Check {{docs/config.toml}} to ensure that
 ** the version constants refer to the new version
 ** the {{baseurl}} does not point to {{flink-docs-master}}  but 
{{flink-docs-release-X.Y}} instead



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31580) CLONE - Stage source and binary releases on dist.apache.org

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31580:
-

 Summary: CLONE - Stage source and binary releases on 
dist.apache.org
 Key: FLINK-31580
 URL: https://issues.apache.org/jira/browse/FLINK-31580
 Project: Flink
  Issue Type: Sub-task
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


Copy the source release to the dev repository of dist.apache.org:
# If you have not already, check out the Flink section of the dev repository on 
dist.apache.org via Subversion. In a fresh directory:
{code:bash}
$ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates
{code}
# Make a directory for the new release and copy all the artifacts (Flink 
source/binary distributions, hashes, GPG signatures and the python 
subdirectory) into that newly created directory:
{code:bash}
$ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
$ mv /tools/releasing/release/* 
flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
{code}
# Add and commit all the files.
{code:bash}
$ cd flink
flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM}
flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}"
{code}
# Verify that files are present under 
[https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink].
# Push the release tag if not done already (the following command assumes to be 
called from within the apache/flink checkout):
{code:bash}
$ git push  refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM}
{code}

 

h3. Expectations
 * Maven artifacts deployed to the staging repository of 
[repository.apache.org|https://repository.apache.org/content/repositories/]
 * Source distribution deployed to the dev repository of 
[dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/]
 * Check hashes (e.g. shasum -c *.sha512)
 * Check signatures (e.g. {{{}gpg --verify 
flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}})
 * {{grep}} for legal headers in each file.
 * If time allows check the NOTICE files of the modules whose dependencies have 
been changed in this release in advance, since the license issues from time to 
time pop up during voting. See [Verifying a Flink 
Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release]
 "Checking License" section.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31582) CLONE - Vote on the release candidate

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31582:
-

 Summary: CLONE - Vote on the release candidate
 Key: FLINK-31582
 URL: https://issues.apache.org/jira/browse/FLINK-31582
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren
 Fix For: 1.17.0


Once you have built and individually reviewed the release candidate, please 
share it for the community-wide review. Please review foundation-wide [voting 
guidelines|http://www.apache.org/foundation/voting.html] for more information.

Start the review-and-vote thread on the dev@ mailing list. Here’s an email 
template; please adjust as you see fit.
{quote}From: Release Manager
To: dev@flink.apache.org
Subject: [VOTE] Release 1.2.3, release candidate #3

Hi everyone,
Please review and vote on the release candidate #3 for the version 1.2.3, as 
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
 * JIRA release notes [1],
 * the official Apache source release and binary convenience releases to be 
deployed to dist.apache.org [2], which are signed with the key with fingerprint 
 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "release-1.2.3-rc3" [5],
 * website pull request listing the new release and adding announcement blog 
post [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1] link
[2] link
[3] [https://dist.apache.org/repos/dist/release/flink/KEYS]
[4] link
[5] link
[6] link
{quote}
*If there are any issues found in the release candidate, reply on the vote 
thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the 
Fix Issues step below and address the problem. However, some issues don’t 
require cancellation. For example, if an issue is found in the website pull 
request, just correct it on the spot and the vote can continue as-is.

For cancelling a release, the release manager needs to send an email to the 
release candidate thread, stating that the release candidate is officially 
cancelled. Next, all artifacts created specifically for the RC in the previous 
steps need to be removed:
 * Delete the staging repository in Nexus
 * Remove the source / binary RC files from dist.apache.org
 * Delete the source code tag in git

*If there are no issues, reply on the vote thread to close the voting.* Then, 
tally the votes in a separate email. Here’s an email template; please adjust as 
you see fit.
{quote}From: Release Manager
To: dev@flink.apache.org
Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3

I'm happy to announce that we have unanimously approved this release.

There are XXX approving votes, XXX of which are binding:
 * approver 1
 * approver 2
 * approver 3
 * approver 4

There are no disapproving votes.

Thanks everyone!
{quote}
 

h3. Expectations
 * Community votes to release the proposed candidate, with at least three 
approving PMC votes

Any issues that are raised till the vote is over should be either resolved or 
moved into the next release (if applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31584) CLONE - CLONE - Build and stage Java and Python artifacts

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31584:
-

 Summary: CLONE - CLONE - Build and stage Java and Python artifacts
 Key: FLINK-31584
 URL: https://issues.apache.org/jira/browse/FLINK-31584
 Project: Flink
  Issue Type: Sub-task
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


# Create a local release branch ((!) this step can not be skipped for minor 
releases):
{code:bash}
$ cd ./tools
tools/ $ OLD_VERSION=$CURRENT_SNAPSHOT_VERSION NEW_VERSION=$RELEASE_VERSION 
RELEASE_CANDIDATE=$RC_NUM releasing/create_release_branch.sh
{code}
 # Tag the release commit:
{code:bash}
$ git tag -s ${TAG} -m "${TAG}"
{code}
 # We now need to do several things:
 ## Create the source release archive
 ## Deploy jar artefacts to the [Apache Nexus 
Repository|https://repository.apache.org/], which is the staging area for 
deploying the jars to Maven Central
 ## Build PyFlink wheel packages
You might want to create a directory on your local machine for collecting the 
various source and binary releases before uploading them. Creating the binary 
releases is a lengthy process but you can do this on another machine (for 
example, in the "cloud"). When doing this, you can skip signing the release 
files on the remote machine, download them to your local machine and sign them 
there.
 # Build the source release:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_source_release.sh
{code}
 # Stage the maven artifacts:
{code:bash}
tools $ releasing/deploy_staging_jars.sh
{code}
Review all staged artifacts ([https://repository.apache.org/]). They should 
contain all relevant parts for each module, including pom.xml, jar, test jar, 
source, test source, javadoc, etc. Carefully review any new artifacts.
 # Close the staging repository on Apache Nexus. When prompted for a 
description, enter “Apache Flink, version X, release candidate Y”.
Then, you need to build the PyFlink wheel packages (since 1.11):
 # Set up an azure pipeline in your own Azure account. You can refer to [Azure 
Pipelines|https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository]
 for more details on how to set up azure pipeline for a fork of the Flink 
repository. Note that a google cloud mirror in Europe is used for downloading 
maven artifacts, therefore it is recommended to set your [Azure organization 
region|https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/change-organization-location]
 to Europe to speed up the downloads.
 # Push the release candidate branch to your forked personal Flink repository, 
e.g.
{code:bash}
tools $ git push  
refs/heads/release-${RELEASE_VERSION}-rc${RC_NUM}:release-${RELEASE_VERSION}-rc${RC_NUM}
{code}
 # Trigger the Azure Pipelines manually to build the PyFlink wheel packages
 ## Go to your Azure Pipelines Flink project → Pipelines
 ## Click the "New pipeline" button on the top right
 ## Select "GitHub" → your GitHub Flink repository → "Existing Azure Pipelines 
YAML file"
 ## Select your branch → Set path to "/azure-pipelines.yaml" → click on 
"Continue" → click on "Variables"
 ## Then click "New Variable" button, fill the name with "MODE", and the value 
with "release". Click "OK" to set the variable and the "Save" button to save 
the variables, then back on the "Review your pipeline" screen click "Run" to 
trigger the build.
 ## You should now see a build where only the "CI build (release)" is running
 # Download the PyFlink wheel packages from the build result page after the 
jobs of "build_wheels mac" and "build_wheels linux" have finished.
 ## Download the PyFlink wheel packages
 ### Open the build result page of the pipeline
 ### Go to the {{Artifacts}} page (build_wheels linux -> 1 artifact)
 ### Click {{wheel_Darwin_build_wheels mac}} and {{wheel_Linux_build_wheels 
linux}} separately to download the zip files
 ## Unzip these two zip files
{code:bash}
$ cd /path/to/downloaded_wheel_packages
$ unzip wheel_Linux_build_wheels\ linux.zip
$ unzip wheel_Darwin_build_wheels\ mac.zip{code}
 ## Create directory {{./dist}} under the directory of {{{}flink-python{}}}:
{code:bash}
$ cd 
$ mkdir flink-python/dist{code}
 ## Move the unzipped wheel packages to the directory of 
{{{}flink-python/dist{}}}:
{code:java}
$ mv /path/to/wheel_Darwin_build_wheels\ mac/* flink-python/dist/
$ mv /path/to/wheel_Linux_build_wheels\ linux/* flink-python/dist/
$ cd tools{code}

Finally, we create the binary convenience release files:
{code:bash}
tools $ RELEASE_VERSION=$RELEASE_VERSION releasing/create_binary_release.sh
{code}
If you want to run this step in parallel on a remote machine you have to make 
the release commit available there (for example by pushing to a repository). 
*This is important: the commit inside the binary builds has to match the commit 
of the source builds and the tagged release commit.* 
When buil

[jira] [Created] (FLINK-31585) CLONE - CLONE - Stage source and binary releases on dist.apache.org

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31585:
-

 Summary: CLONE - CLONE - Stage source and binary releases on 
dist.apache.org
 Key: FLINK-31585
 URL: https://issues.apache.org/jira/browse/FLINK-31585
 Project: Flink
  Issue Type: Sub-task
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


Copy the source release to the dev repository of dist.apache.org:
# If you have not already, check out the Flink section of the dev repository on 
dist.apache.org via Subversion. In a fresh directory:
{code:bash}
$ svn checkout https://dist.apache.org/repos/dist/dev/flink --depth=immediates
{code}
# Make a directory for the new release and copy all the artifacts (Flink 
source/binary distributions, hashes, GPG signatures and the python 
subdirectory) into that newly created directory:
{code:bash}
$ mkdir flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
$ mv /tools/releasing/release/* 
flink/flink-${RELEASE_VERSION}-rc${RC_NUM}
{code}
# Add and commit all the files.
{code:bash}
$ cd flink
flink $ svn add flink-${RELEASE_VERSION}-rc${RC_NUM}
flink $ svn commit -m "Add flink-${RELEASE_VERSION}-rc${RC_NUM}"
{code}
# Verify that files are present under 
[https://dist.apache.org/repos/dist/dev/flink|https://dist.apache.org/repos/dist/dev/flink].
# Push the release tag if not done already (the following command assumes to be 
called from within the apache/flink checkout):
{code:bash}
$ git push  refs/tags/release-${RELEASE_VERSION}-rc${RC_NUM}
{code}

 

h3. Expectations
 * Maven artifacts deployed to the staging repository of 
[repository.apache.org|https://repository.apache.org/content/repositories/]
 * Source distribution deployed to the dev repository of 
[dist.apache.org|https://dist.apache.org/repos/dist/dev/flink/]
 * Check hashes (e.g. shasum -c *.sha512)
 * Check signatures (e.g. {{{}gpg --verify 
flink-1.2.3-source-release.tar.gz.asc flink-1.2.3-source-release.tar.gz{}}})
 * {{grep}} for legal headers in each file.
 * If time allows check the NOTICE files of the modules whose dependencies have 
been changed in this release in advance, since the license issues from time to 
time pop up during voting. See [Verifying a Flink 
Release|https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Release]
 "Checking License" section.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31587) CLONE - CLONE - Vote on the release candidate

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31587:
-

 Summary: CLONE - CLONE - Vote on the release candidate
 Key: FLINK-31587
 URL: https://issues.apache.org/jira/browse/FLINK-31587
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


Once you have built and individually reviewed the release candidate, please 
share it for the community-wide review. Please review foundation-wide [voting 
guidelines|http://www.apache.org/foundation/voting.html] for more information.

Start the review-and-vote thread on the dev@ mailing list. Here’s an email 
template; please adjust as you see fit.
{quote}From: Release Manager
To: dev@flink.apache.org
Subject: [VOTE] Release 1.2.3, release candidate #3

Hi everyone,
Please review and vote on the release candidate #3 for the version 1.2.3, as 
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
 * JIRA release notes [1],
 * the official Apache source release and binary convenience releases to be 
deployed to dist.apache.org [2], which are signed with the key with fingerprint 
 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "release-1.2.3-rc3" [5],
 * website pull request listing the new release and adding announcement blog 
post [6].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Thanks,
Release Manager

[1] link
[2] link
[3] [https://dist.apache.org/repos/dist/release/flink/KEYS]
[4] link
[5] link
[6] link
{quote}
*If there are any issues found in the release candidate, reply on the vote 
thread to cancel the vote.* There’s no need to wait 72 hours. Proceed to the 
Fix Issues step below and address the problem. However, some issues don’t 
require cancellation. For example, if an issue is found in the website pull 
request, just correct it on the spot and the vote can continue as-is.

For cancelling a release, the release manager needs to send an email to the 
release candidate thread, stating that the release candidate is officially 
cancelled. Next, all artifacts created specifically for the RC in the previous 
steps need to be removed:
 * Delete the staging repository in Nexus
 * Remove the source / binary RC files from dist.apache.org
 * Delete the source code tag in git

*If there are no issues, reply on the vote thread to close the voting.* Then, 
tally the votes in a separate email. Here’s an email template; please adjust as 
you see fit.
{quote}From: Release Manager
To: dev@flink.apache.org
Subject: [RESULT] [VOTE] Release 1.2.3, release candidate #3

I'm happy to announce that we have unanimously approved this release.

There are XXX approving votes, XXX of which are binding:
 * approver 1
 * approver 2
 * approver 3
 * approver 4

There are no disapproving votes.

Thanks everyone!
{quote}
 

h3. Expectations
 * Community votes to release the proposed candidate, with at least three 
approving PMC votes

Any issues that are raised till the vote is over should be either resolved or 
moved into the next release (if applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31583) Build Release Candidate: 1.17.0-rc3

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31583:
-

 Summary: Build Release Candidate: 1.17.0-rc3
 Key: FLINK-31583
 URL: https://issues.apache.org/jira/browse/FLINK-31583
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


The core of the release process is the build-vote-fix cycle. Each cycle 
produces one release candidate. The Release Manager repeats this cycle until 
the community approves one release candidate, which is then finalized.
h4. Prerequisites

Set up a few environment variables to simplify Maven commands that follow. This 
identifies the release candidate being built. Start with {{RC_NUM}} equal to 1 
and increment it for each candidate:
{code:java}
RC_NUM="2"
TAG="release-${RELEASE_VERSION}-rc${RC_NUM}"
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31586) CLONE - CLONE - Propose a pull request for website updates

2023-03-23 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-31586:
-

 Summary: CLONE - CLONE - Propose a pull request for website updates
 Key: FLINK-31586
 URL: https://issues.apache.org/jira/browse/FLINK-31586
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.17.0
Reporter: Qingsheng Ren
Assignee: Qingsheng Ren


The final step of building the candidate is to propose a website pull request 
containing the following changes:
 # update 
[apache/flink-web:_config.yml|https://github.com/apache/flink-web/blob/asf-site/_config.yml]
 ## update {{FLINK_VERSION_STABLE}} and {{FLINK_VERSION_STABLE_SHORT}} as 
required
 ## update version references in quickstarts ({{{}q/{}}} directory) as required
 ## (major only) add a new entry to {{flink_releases}} for the release binaries 
and sources
 ## (minor only) update the entry for the previous release in the series in 
{{flink_releases}}
 ### Please pay notice to the ids assigned to the download entries. They should 
be unique and reflect their corresponding version number.
 ## add a new entry to {{release_archive.flink}}
 # add a blog post announcing the release in _posts
 # add a organized release notes page under docs/content/release-notes and 
docs/content.zh/release-notes (like 
[https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/]).
 The page is based on the non-empty release notes collected from the issues, 
and only the issues that affect existing users should be included (e.g., 
instead of new functionality). It should be in a separate PR since it would be 
merged to the flink project.

(!) Don’t merge the PRs before finalizing the release.

 

h3. Expectations
 * Website pull request proposed to list the 
[release|http://flink.apache.org/downloads.html]
 * (major only) Check {{docs/config.toml}} to ensure that
 ** the version constants refer to the new version
 ** the {{baseurl}} does not point to {{flink-docs-master}}  but 
{{flink-docs-release-X.Y}} instead



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-03-23 Thread Hang Ruan
Hi, yuxia,

Thanks for starting the discussion.

I wonder what the behavior is when we truncate a table which is used as a
source. Source table and sink table may have different table options.
IMO, the truncate sql should be supported no matter which kind the table is.

Best,
Hang

Shammon FY  于2023年3月23日周四 08:55写道:

> Hi yuxia
>
> Thanks for initiating this discussion.
>
> There are usually two types of data deletion in a production environment:
> one is deleting data directly and the other is moving the data to the trash
> directory which will be deleted periodically by the underlying system.
>
> Can we distinguish between these two operations in the truncate syntax? Or
> support adding options in `with`?
>
> Best,
> Shammon FY
>
>
> On Wed, Mar 22, 2023 at 9:13 PM yuxia  wrote:
>
> > Hi, devs.
> >
> > I'd like to start a discussion about FLIP-302: Support TRUNCATE TABLE
> > statement [1].
> >
> > The TRUNCATE TABLE statement is a SQL command that allows users to
> quickly
> > and efficiently delete all rows from a table without dropping the table
> > itself. This statement is commonly used in data warehouse, where large
> data
> > sets are frequently loaded and unloaded from tables.
> > So, this FLIP is meant to support TRUNCATE TABLE statement. M ore
> exactly,
> > this FLIP will bring Flink the TRUNCATE TABLE syntax and an interface
> with
> > which the coresponding connectors can implement their own logic for
> > truncating table.
> >
> > Looking forwards to your feedback.
> >
> > [1]: [
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> > |
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> > ]
> >
> >
> > Best regards,
> > Yuxia
> >
>


Re: [External] [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Yisha Zhou
Hi Jane,
Thanks for driving this, and your solution is absolutely creative. In my 
company, there also exist some scenarios in which users want to config state 
TTL at operator level, especially for window operators which regard TTL as 
allow lateness.  

To support this scenarios, we implemented a query hint like below:

SELECT /*+ STATE_TTL('1D') */
id,
max(num) as num
FROM source
GROUP BY id

For reasons to reject SQL Hints you mentioned in the FLIP, I have some 
different opinions. 
1. Hints may not only aim to inspire the planner. For example, we have dynamic 
options for tables and users can config parallelism for source/sink operators. 
State TTL is also kind of parameter which can be configured dynamically.

2. I agree that users need not to understand how SQL statements being 
translated to operators. And exactly for this reason, json plan is too complex 
for users to read. For example, if we enable 
'table.optimizer.distinct-agg.split.enabled’ and 
'table.optimizer.incremental-agg-enabled’, we get two Agg operators, I don’t 
think all users know which operators are related to their queries and set TTL 
correctly.

Therefore, I personally prefer to support this by query hints and users can 
config TTL for their ‘group by’ query instead of several operators. 

Best regards,
Yisha

> 2023年3月21日 19:51,Jane Chan  写道:
> 
> Hi devs,
> 
> I'd like to start a discussion on FLIP-292: Support configuring state TTL
> at operator level for Table API & SQL programs [1].
> 
> Currently, we only support job-level state TTL configuration via
> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL
> control to optimize state usage. Hence we propose to serialize/deserialize
> the state TTL as metadata of the operator's state to/from the compiled JSON
> plan, to achieve the goal that specifying different state TTL when
> transforming the exec node to stateful operators.
> 
> Look forward to your opinions!
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
> 
> Best Regards,
> Jane Chan



Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-23 Thread Jing Ge
Excellent work! Congratulations! Appreciate the hard work and contributions
of everyone in the Apache Flink community who helped make this release
possible. Looking forward to those new features. Cheers!

Best regards,
Jing

On Thu, Mar 23, 2023 at 10:24 AM Leonard Xu  wrote:

> The Apache Flink community is very happy to announce the release of Apache 
> Flink
> 1.17.0, which is the first release for the Apache Flink 1.17 series.
>
> Apache Flink® is an open-source unified stream and batch data processing 
> framework
> for distributed, high-performing, always-available, and accurate data
> applications.
>
> The release is available for download at:
>
> *https://flink.apache.org/downloads.html
> *
> Please check out the release blog post for an overview of the improvements
> for this release:
>
> *https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> *
> The full release notes are available in Jira:
>
> *https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
> 
> *
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Best regards,
> Qingsheng, Martijn, Matthias and Leonard
>


[RESULT][VOTE] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-23 Thread Yuxin Tan
Hi all,

I am happy to announce that FLIP-301: Hybrid Shuffle supports Remote
Storage[1] has been accepted. The FLIP was voted in this thread[2].

There are 4 binding votes and 3 non-binding votes:

- Wencong Liu (non-binding)
- Weihua Hu (non-binding)
- Junrui Lee (non-binding)
- weijie guo (binding)
- Xintong Song (binding)
- Zhu Zhu (binding)
- Yun Tang (binding)

There is no disapproving vote. Thanks.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
[2] https://lists.apache.org/thread/xpmhpmodzlwo03n6zzovy36gox84l6zl

Best,
Yuxin


Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-03-23 Thread Ran Tao
Hi, yuxia.

Thanks for starting the discussion.
I think it's a nice improvement to support TRUNCATE TABLE statement because
many other mature engines supports it.

I have some questions.
1. because table has different types, whether we will support view or
temporary tables?

2. some other engines such as spark and hive support TRUNCATE TABLE with
partition. whether we will support?
btw, i think you need give the TRUNCATE TABLE concrete syntax in the FLIP
because some engines has different syntaxes.
for example, hive allow TRUNCATE TABLE be TRUNCATE [TABLE] which means
TABLE keyword can be optional.

3. The Proposed Changes try to use SqlToOperationConverter and run in
TableEnvironmentImpl#executeInternal.
I think it's out of date, the community is refactoring the conversion logic
from SqlNode to operation[1] and executions in TableEnvironmentImpl[2].
I suggest you can use new way to support it.

[1] https://issues.apache.org/jira/browse/FLINK-31464
[2] https://issues.apache.org/jira/browse/FLINK-31368

Best Regards,
Ran Tao
https://github.com/chucheng92


yuxia  于2023年3月22日周三 21:13写道:

> Hi, devs.
>
> I'd like to start a discussion about FLIP-302: Support TRUNCATE TABLE
> statement [1].
>
> The TRUNCATE TABLE statement is a SQL command that allows users to quickly
> and efficiently delete all rows from a table without dropping the table
> itself. This statement is commonly used in data warehouse, where large data
> sets are frequently loaded and unloaded from tables.
> So, this FLIP is meant to support TRUNCATE TABLE statement. M ore exactly,
> this FLIP will bring Flink the TRUNCATE TABLE syntax and an interface with
> which the coresponding connectors can implement their own logic for
> truncating table.
>
> Looking forwards to your feedback.
>
> [1]: [
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> |
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> ]
>
>
> Best regards,
> Yuxia
>


Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-23 Thread Matthias Pohl
Thanks for making this release getting over the finish line.

One additional thing:
Feel free to reach out to the release managers (or respond to this thread)
with feedback on the release process. Our goal is to constantly improve the
release process. Feedback on what could be improved or things that didn't
go so well during the 1.17.0 release cycle are much appreciated.

Best,
Matthias

On Thu, Mar 23, 2023 at 11:02 AM Jing Ge via user 
wrote:

> Excellent work! Congratulations! Appreciate the hard work and
> contributions of everyone in the Apache Flink community who helped make
> this release possible. Looking forward to those new features. Cheers!
>
> Best regards,
> Jing
>
> On Thu, Mar 23, 2023 at 10:24 AM Leonard Xu  wrote:
>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink
>> 1.17.0, which is the first release for the Apache Flink 1.17 series.
>>
>> Apache Flink® is an open-source unified stream and batch data processing 
>> framework
>> for distributed, high-performing, always-available, and accurate data
>> applications.
>>
>> The release is available for download at:
>>
>> *https://flink.apache.org/downloads.html
>> *
>> Please check out the release blog post for an overview of the improvements
>> for this release:
>>
>> *https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
>> *
>> The full release notes are available in Jira:
>>
>> *https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585
>> 
>> *
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Best regards,
>> Qingsheng, Martijn, Matthias and Leonard
>>
>


Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-23 Thread Tamir Sagi
Congratulations. Well done!

From: Matthias Pohl via user 
Sent: Thursday, March 23, 2023 12:28 PM
To: Jing Ge 
Cc: Leonard Xu ; dev ; 
annou...@apache.org ; user ; 
user-zh 
Subject: Re: [ANNOUNCE] Apache Flink 1.17.0 released


EXTERNAL EMAIL


Thanks for making this release getting over the finish line.

One additional thing:
Feel free to reach out to the release managers (or respond to this thread) with 
feedback on the release process. Our goal is to constantly improve the release 
process. Feedback on what could be improved or things that didn't go so well 
during the 1.17.0 release cycle are much appreciated.

Best,
Matthias

On Thu, Mar 23, 2023 at 11:02 AM Jing Ge via user 
mailto:u...@flink.apache.org>> wrote:
Excellent work! Congratulations! Appreciate the hard work and contributions of 
everyone in the Apache Flink community who helped make this release possible. 
Looking forward to those new features. Cheers!

Best regards,
Jing

On Thu, Mar 23, 2023 at 10:24 AM Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.17.0, which is the first release for the Apache Flink 1.17 series.

Apache Flink® is an open-source unified stream and batch data processing 
framework for distributed, high-performing, always-available, and accurate data 
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this release:
https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351585

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Best regards,
Qingsheng, Martijn, Matthias and Leonard

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


[jira] [Created] (FLINK-31588) The unaligned checkpoint type is wrong at subtask level

2023-03-23 Thread Rui Fan (Jira)
Rui Fan created FLINK-31588:
---

 Summary: The unaligned checkpoint type is wrong at subtask level
 Key: FLINK-31588
 URL: https://issues.apache.org/jira/browse/FLINK-31588
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.17.0, 1.16.0
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-03-23-18-45-01-535.png

FLINK-20488 supported show checkpoint type for each subtask, and it based on 
received `CheckpointOptions` and it's right.

However, FLINK-27251 supported timeout aligned to unaligned checkpoint barrier 
in the output buffers. It means the received `CheckpointOptions` can be 
converted from aligned checkpoint to unaligned checkpoint.

So, the unaligned checkpoint type may be wrong at subtask level. For example, 
as shown in the figure below, Unaligned checkpoint type is false, but it is 
actually Unaligned checkpoint (persisted data > 0).

 

!image-2023-03-23-18-45-01-535.png|width=1879,height=797!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31589) Reduce the pressure of the list pod method on the k8s cluster

2023-03-23 Thread liuzhuo (Jira)
liuzhuo created FLINK-31589:
---

 Summary: Reduce the pressure of the list pod method on the k8s 
cluster
 Key: FLINK-31589
 URL: https://issues.apache.org/jira/browse/FLINK-31589
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: liuzhuo


In the production environment, we found that if a cluster-level failure occurs, 
the jobs of a cluster will failover at the same time, which puts a lot of 
pressure on the k8s cluster.
Through the monitoring of k8s, we found that the method of list pod caused 
relatively high pressure.
By querying the relevant information of fabric8, we found an optimization 
scheme for the list pod, that is, adding the parameter of 
{code:java}
ResourceVersion=0{code}
 in the list pod can greatly reduce the pressure on the k8s cluster

 

[link|https://github.com/fabric8io/kubernetes-client/issues/4670]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [External] [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Jane Chan
Hi Yisha,

Thank you for the valuable feedback! I appreciate that you find this
proposal beneficial. I'd like to answer your questions and explain the
reason why we don't prefer to use SQL hints.

> 1. Hints may not only aim to inspire the planner. For example, we have
dynamic options for tables and users can config parallelism for source/sink
operators. State TTL is also kind of parameter which can be configured
dynamically.

The core reason for us to reject using hints to configure state TTL is not
due to its dynamic configuration nature. In fact, configuring TTL through a
compiled plan is also a form of dynamic adjustment. The key lies in the
fact that state TTL affects the calculation results of data. From the SQL
semantic perspective, hints cannot intervene in the calculation of data
results. The adjustment of parallelism in your example is a good use case
of hints.

> 2. I agree that users need not to understand how SQL statements being
translated to operators. And exactly for this reason, json plan is too
complex for users to read. For example, if we enable
'table.optimizer.distinct-agg.split.enabled’ and
'table.optimizer.incremental-agg-enabled’, we get two Agg operators, I
don’t think all users know which operators are related to their queries and
set TTL correctly.

Leaving aside the first semantic core point, the second reason is that
hints are configured on SQL and actually work on operators. The scope of
the hint is not clear enough. For example, in a job, the source needs to
first do the de-duplication, then join with another source, and finally do
the aggregation, and the user wants to set different state TTL at each
stage, so which part of the SQL should the user write each hint in, and
which part of the SQL should each hint act on?  More importantly, some
stateful operators are not reflected in SQL, such as ChangelogNormalize and
SinkUpsertMaterializer, which are derived and added by the planner
implicitly.
You raise a good point that compiled plan JSON is less readable, but it's
accurate enough. Back to the split distinct agg case, I think users who
enable this configuration are fully aware of their data has a skewness and
needs a two-layer group aggregate.

Let's see what other people think.

Best,
Jane

On Thu, Mar 23, 2023 at 6:00 PM Yisha Zhou 
wrote:

> Hi Jane,
> Thanks for driving this, and your solution is absolutely creative. In my
> company, there also exist some scenarios in which users want to config
> state TTL at operator level, especially for window operators which regard
> TTL as allow lateness.
>
> To support this scenarios, we implemented a query hint like below:
>
> SELECT /*+ STATE_TTL('1D') */
> id,
> max(num) as num
> FROM source
> GROUP BY id
>
> For reasons to reject SQL Hints you mentioned in the FLIP, I have some
> different opinions.
> 1. Hints may not only aim to inspire the planner. For example, we have
> dynamic options for tables and users can config parallelism for source/sink
> operators. State TTL is also kind of parameter which can be configured
> dynamically.
>
> 2. I agree that users need not to understand how SQL statements being
> translated to operators. And exactly for this reason, json plan is too
> complex for users to read. For example, if we enable
> 'table.optimizer.distinct-agg.split.enabled’ and
> 'table.optimizer.incremental-agg-enabled’, we get two Agg operators, I
> don’t think all users know which operators are related to their queries and
> set TTL correctly.
>
> Therefore, I personally prefer to support this by query hints and users
> can config TTL for their ‘group by’ query instead of several operators.
>
> Best regards,
> Yisha
>
> > 2023年3月21日 19:51,Jane Chan  写道:
> >
> > Hi devs,
> >
> > I'd like to start a discussion on FLIP-292: Support configuring state TTL
> > at operator level for Table API & SQL programs [1].
> >
> > Currently, we only support job-level state TTL configuration via
> > 'table.exec.state.ttl'. However, users may expect a fine-grained state
> TTL
> > control to optimize state usage. Hence we propose to
> serialize/deserialize
> > the state TTL as metadata of the operator's state to/from the compiled
> JSON
> > plan, to achieve the goal that specifying different state TTL when
> > transforming the exec node to stateful operators.
> >
> > Look forward to your opinions!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
> >
> > Best Regards,
> > Jane Chan
>
>


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-23 Thread Shammon FY
Hi Hongshun

Thanks for driving this discussion. Automatically discovering partitions
without losing data sounds great!

Currently flink supports kafka source with different startup modes, such as
EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.

If I understand correctly, you will set the offset of new partitions with
EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode
for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
for kafka in their jobs.

For an extreme example, the current time is 2023-03-23 15:00:00 and users
set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
is added during this period, jobs will generate “surprising” data. What do
you think of it?


Best,
Shammon FY


On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang 
wrote:

> Hi, Hang,
>
> Thanks for your advice.
>
> When the second case will occur? Currently, there are three ways to specify
> partitions in Kafka: by topic, by partition, and by matching the topic with
> a regular expression. Currently, if the initial partition number is 0, an
> error will occur for the first two methods. However, when using a regular
> expression to match topics, it is allowed to have 0 matched topics.
>
> > I don't know when the second case will occur
>
>
> Why prefer the field `firstDiscoveryDone`? When a regular expression
> initially matches 0 topics, it should consume all messages of the new
> topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
> used instead of firstDiscoveryDone, any new topics created during (5
> minutes discovery + job restart time) will be treated as the first
> discovery, causing data loss.
>
> > Then when will we get the empty partition list? I think it should be
> treated as the first initial discovery if both `unassignedInitialPartitons`
> and `assignedPartitons` are empty without `firstDiscoveryDone`.
>
> Best
>
> Hongshun
>
> On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan  wrote:
>
> > Hi, Hongshun,
> >
> > Thank you for starting this discussion.  I have some problems about the
> > field `firstDiscoveryDone`.
> >
> > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > Why do we need firstDiscoveryDone? Only relying on the
> > unAssignedInitialPartitons attribute cannot distinguish between the
> > following two cases (which often occur in pattern mode):
> > > The first partition discovery is so slow, before which the checkpoint
> is
> > executed and then job is restarted . At this time, the restored
> > unAssignedInitialPartitons is an empty set, which means non-discovery.
> The
> > next discovery will be treated as first discovery.
> > > The first time the partition is discovered is empty, and new partitions
> > can only be found after multiple partition discoveries. If a restart
> occurs
> > between this period, the restored unAssignedInitialPartitons is also an
> > empty set, which means empty-discovery.The next discovery will be treated
> > as new discovery.
> >
> > I don't know when the second case will occur. The partitions must be
> > greater than 0 when creating topics. And I have read this note in the
> FLIP.
> > > Note: The current design only applies to cases where all existing
> > partitions can be discovered at once. If all old partitions cannot be
> > discovered at once, the subsequent old partitions discovered will be
> > treated as new partitions, leading to message duplication. Therefore,
> this
> > point needs to be particularly noted.
> >
> > Then when will we get the empty partition list? I think it should be
> > treated as the first initial discovery if both
> `unassignedInitialPartitons`
> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> >
> > Besides that, I think the `unAssignedInitialPartitons` is better to be
> > named `unassignedInitialPartitons`.
> >
> > Best,
> > Hang
> >
> > Hongshun Wang  于2023年3月17日周五 18:42写道:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion on FLIP-288:Enable Dynamic Partition
> > > Discovery by Default in Kafka Source[1].
> > >
> > > As described in mail thread[2], dynamic partition discovery is disabled
> > by
> > > default and users have to explicitly specify the interval of discovery
> in
> > > order to turn it on. Besides, if the initial offset strategy is LATEST,
> > > same strategy is used for new partitions, leading to the loss of some
> > data
> > > (thinking a new partition is created and might be discovered by Kafka
> > > source several minutes later, and the message produced into the
> partition
> > > within the gap might be dropped if we use for example "latest" as the
> > > initial offset strategy.)
> > >
> > > The goals of this FLIP are as follows:
> > >
> > >1. Enable partition discovery by default.
> > >2. Use earliest as the offset strategy for new partitions after the
> > >first discovery.
> > >
> > > Looking forward to hearing from you.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org

[jira] [Created] (FLINK-31590) Allow setting JobResourceRequirements through JobMasterGateway

2023-03-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-31590:


 Summary: Allow setting JobResourceRequirements through 
JobMasterGateway
 Key: FLINK-31590
 URL: https://issues.apache.org/jira/browse/FLINK-31590
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31591) Extend JobGraphWriter to persist requirements

2023-03-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-31591:


 Summary: Extend JobGraphWriter to persist requirements
 Key: FLINK-31591
 URL: https://issues.apache.org/jira/browse/FLINK-31591
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31592) Configurable deletion propagation for Flink resources

2023-03-23 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31592:
--

 Summary: Configurable deletion propagation for Flink resources
 Key: FLINK-31592
 URL: https://issues.apache.org/jira/browse/FLINK-31592
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora


Currently we use the default (background) deletion propagation when we delete 
Kubernetes resources such as the JobManager deployment.

In most cases this is acceptable however this can lead to lingering resources 
when the garbage collection is slow on the k8s cluster. We therefore propose to 
make this configurable and also change the default to Foreground which is more 
predictable and works better for our purposes in the operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shuo Cheng
Hi Jane,
Thanks for driving this, operator level state ttl is absolutely a desired
feature. I would share my opinion as following:

If the scope of this proposal is limited as an enhancement for compiled
json plan, it makes sense. I think it does not conflict with configuring
state ttl
in other ways, e.g., SQL HINT or something else, because they just work in
different level, SQL Hint works in the exact entrance of SQL API, while
compiled json plan is the intermediate results for SQL.
I think the final shape of state ttl configuring may like the that, users
can define operator state ttl using SQL HINT (assumption...), but it may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

In a word, this proposal is in good shape as an enhancement for compiled
json plan, and it's orthogonal with other ways like SQL Hint which works in
a higher level.


Nips:

> "From the SQL semantic perspective, hints cannot intervene in the
calculation of data results."
I think it's more properly to say that hint does not affect the equivalence
of execution plans (hash agg vs sort agg), not the equivalence of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector, which
also "intervene in the calculation of data results".

Sincerely,
Shuo

On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:

> Hi devs,
>
> I'd like to start a discussion on FLIP-292: Support configuring state TTL
> at operator level for Table API & SQL programs [1].
>
> Currently, we only support job-level state TTL configuration via
> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL
> control to optimize state usage. Hence we propose to serialize/deserialize
> the state TTL as metadata of the operator's state to/from the compiled JSON
> plan, to achieve the goal that specifying different state TTL when
> transforming the exec node to stateful operators.
>
> Look forward to your opinions!
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
>
> Best Regards,
> Jane Chan
>


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shuo Cheng
Correction: “users can set 'scan.startup.mode' for kafka connector” -> “users
can set 'scan.startup.mode' for kafka connector by dynamic table option”

Shuo Cheng 于2023年3月23日 周四21:50写道:

> Hi Jane,
> Thanks for driving this, operator level state ttl is absolutely a desired
> feature. I would share my opinion as following:
>
> If the scope of this proposal is limited as an enhancement for compiled
> json plan, it makes sense. I think it does not conflict with configuring
> state ttl
> in other ways, e.g., SQL HINT or something else, because they just work in
> different level, SQL Hint works in the exact entrance of SQL API, while
> compiled json plan is the intermediate results for SQL.
> I think the final shape of state ttl configuring may like the that, users
> can define operator state ttl using SQL HINT (assumption...), but it may
> affects more than one stateful operators inside the same query block, then
> users can further configure a specific one by modifying the compiled json
> plan...
>
> In a word, this proposal is in good shape as an enhancement for compiled
> json plan, and it's orthogonal with other ways like SQL Hint which works in
> a higher level.
>
>
> Nips:
>
> > "From the SQL semantic perspective, hints cannot intervene in the
> calculation of data results."
> I think it's more properly to say that hint does not affect the
> equivalence of execution plans (hash agg vs sort agg), not the equivalence
> of execution results, e.g., users can set 'scan.startup.mode' for kafka
> connector, which also "intervene in the calculation of data results".
>
> Sincerely,
> Shuo
>
> On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:
>
>> Hi devs,
>>
>> I'd like to start a discussion on FLIP-292: Support configuring state TTL
>> at operator level for Table API & SQL programs [1].
>>
>> Currently, we only support job-level state TTL configuration via
>> 'table.exec.state.ttl'. However, users may expect a fine-grained state TTL
>> control to optimize state usage. Hence we propose to serialize/deserialize
>> the state TTL as metadata of the operator's state to/from the compiled
>> JSON
>> plan, to achieve the goal that specifying different state TTL when
>> transforming the exec node to stateful operators.
>>
>> Look forward to your opinions!
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
>>
>> Best Regards,
>> Jane Chan
>>
>


[jira] [Created] (FLINK-31593) Update reference data for Migration Tests

2023-03-23 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31593:
-

 Summary: Update reference data for Migration Tests
 Key: FLINK-31593
 URL: https://issues.apache.org/jira/browse/FLINK-31593
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl


# Update {{CURRENT_VERSION in TypeSerializerUpgradeTestBase}}  with the new 
version. This will likely fail some tests because snapshots are missing for 
that version. Generate them, for example in {{TypeSerializerUpgradeTestBase.}} 
 # (major/minor only) Update migration tests in master to cover migration from 
new version: (search for usages of FlinkV{{{}ersion{}}})
 ** AbstractOperatorRestoreTestBase
 ** CEPMigrationTest
 ** BucketingSinkMigrationTest
 ** FlinkKafkaConsumerBaseMigrationTest
 ** ContinuousFileProcessingMigrationTest
 ** WindowOperatorMigrationTest
 ** StatefulJobSavepointMigrationITCase
 ** StatefulJobWBroadcastStateMigrationITCase



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31594) Updates the docs stable version

2023-03-23 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-31594:
-

 Summary: Updates the docs stable version
 Key: FLINK-31594
 URL: https://issues.apache.org/jira/browse/FLINK-31594
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl


Update docs to "stable" in {{docs/config.toml}} in the branch of the 
_just-released_ version:
 * Change V{{{}ersion{}}} from {{{}x.y-SNAPSHOT }}to \{{{}x.y.z{}}}, i.e. 
{{1.6-SNAPSHOT}} to {{1.6.0}}
 * Change V{{{}ersionTitle{}}} from {{x.y-SNAPSHOT}} to {{{}x.y{}}}, i.e. 
{{1.6-SNAPSHOT}} to {{1.6}}
 * Change Branch from {{master}} to {{{}release-x.y{}}}, i.e. {{master}} to 
{{release-1.6}}
 * Change {{baseURL}} from 
{{//[ci.apache.org/projects/flink/flink-docs-master|http://ci.apache.org/projects/flink/flink-docs-master]}}
 to 
{{//[ci.apache.org/projects/flink/flink-docs-release-x.y|http://ci.apache.org/projects/flink/flink-docs-release-x.y]}}
 * Change {{javadocs_baseurl}} from 
{{//[ci.apache.org/projects/flink/flink-docs-master|http://ci.apache.org/projects/flink/flink-docs-master]}}
 to 
{{//[ci.apache.org/projects/flink/flink-docs-release-x.y|http://ci.apache.org/projects/flink/flink-docs-release-x.y]}}
 * Change {{IsStable}} to {{true}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31595) MiniBatchLocalGroupAggFunction produces wrong aggregate results with state clean up

2023-03-23 Thread Bo Cui (Jira)
Bo Cui created FLINK-31595:
--

 Summary: MiniBatchLocalGroupAggFunction produces wrong aggregate 
results with state clean up
 Key: FLINK-31595
 URL: https://issues.apache.org/jira/browse/FLINK-31595
 Project: Flink
  Issue Type: Bug
Reporter: Bo Cui


If the upstream operator supports retract data, and the first data in a batch 
may be retract data, and the retract data should be ignored.

https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.java#L68



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31596) Cleanup usage of deprecated methods in TableEnvironment

2023-03-23 Thread Jark Wu (Jira)
Jark Wu created FLINK-31596:
---

 Summary: Cleanup usage of deprecated methods in TableEnvironment
 Key: FLINK-31596
 URL: https://issues.apache.org/jira/browse/FLINK-31596
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Jark Wu


This is a preparation to remove the deprecated methods in Table API when Flink 
v2.0. Currently, the deprecated methods of TableEnvironment and 
StreamTableEnvironment are still used in many places. This is an umbrella issue 
to clean up the usage. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31597) Cleanup usage of deprecated TableEnvironment#registerFunction

2023-03-23 Thread Jark Wu (Jira)
Jark Wu created FLINK-31597:
---

 Summary: Cleanup usage of deprecated 
TableEnvironment#registerFunction
 Key: FLINK-31597
 URL: https://issues.apache.org/jira/browse/FLINK-31597
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31598) Cleanup usage of deprecated TableEnvironment#registerTable

2023-03-23 Thread Jark Wu (Jira)
Jark Wu created FLINK-31598:
---

 Summary: Cleanup usage of deprecated TableEnvironment#registerTable
 Key: FLINK-31598
 URL: https://issues.apache.org/jira/browse/FLINK-31598
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Jark Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31599) Update Kafka dependency in flink-connector-kafka to 3.4.0

2023-03-23 Thread Alex Sorokoumov (Jira)
Alex Sorokoumov created FLINK-31599:
---

 Summary: Update Kafka dependency in flink-connector-kafka to 3.4.0
 Key: FLINK-31599
 URL: https://issues.apache.org/jira/browse/FLINK-31599
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Reporter: Alex Sorokoumov


There is a number of reasons to upgrade to the latest version.
 
First, the Kafka connector uses reflection, so internal changes in Kafka 
clients' implementation might require changes in the connector. With more 
frequent upgrades, the amount of work per upgrade is smaller.
 
Second, there were a number of relevant bug fixes since 3.2.3:
* [KAFKA-14303] - Producer.send without record key and batch.size=0 goes into 
infinite loop
* [KAFKA-14379] - consumer should refresh preferred read replica on update 
metadata
* [KAFKA-14422] - Consumer rebalance stuck after new static member joins a 
group with members not supporting static members
* [KAFKA-14417] - Producer doesn't handle REQUEST_TIMED_OUT for 
InitProducerIdRequest, treats as fatal error
* [KAFKA-14532] - Correctly handle failed fetch when partitions unassigned
* 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31600) Fix producer leaks in flink-connector-kafka tests

2023-03-23 Thread Alex Sorokoumov (Jira)
Alex Sorokoumov created FLINK-31600:
---

 Summary: Fix producer leaks in flink-connector-kafka tests
 Key: FLINK-31600
 URL: https://issues.apache.org/jira/browse/FLINK-31600
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Alex Sorokoumov


Local runs of flink-connector-kafka tests fail with 

 
{noformat}
 java.lang.AssertionError: Detected producer leak. Thread name: 
kafka-producer-network-thread | producer-transactionalId    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.checkProducerLeak(FlinkKafkaProducerITCase.java:829)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRestoreToCheckpointAfterExceedingProducersPool(FlinkKafkaProducerITCase.java:169)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at 
org.apache.flink.testutils.junit.RetryRule$RetryOnFailureStatement.evaluate(RetryRule.java:135)
    at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
    at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
    at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
    at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
    at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
    at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at 
com.intellij.rt.junit.IdeaTestRunner$Repeate

[jira] [Created] (FLINK-31601) While waiting for resources, resources check might be scheduled unlimited number of times (Adaptive Scheduler)

2023-03-23 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-31601:
-

 Summary: While waiting for resources, resources check might be 
scheduled unlimited number of times (Adaptive Scheduler)
 Key: FLINK-31601
 URL: https://issues.apache.org/jira/browse/FLINK-31601
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Roman Khachatryan
 Fix For: 1.17.1


See [https://github.com/apache/flink/pull/22169#discussion_r1136395017]
{quote}when {{resourceStabilizationDeadline}} is not null, should we skip 
scheduling {{checkDesiredOrSufficientResourcesAvailable}} (on [line 
166|https://github.com/apache/flink/blob/a64781b1ef8f129021bdcddd3b07548e6caa4a72/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java#L166])?
Otherwise, we schedule as many checks as there are changes in resources.
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Shammon FY
Hi jane

Thanks for initializing this discussion. Configure TTL per operator can
help users manage state more effectively.

I think the `compiled json plan` proposal may need to consider the impact
on the user's submission workflow. Generally, Flink jobs support two types
of submission: SQL and jar. If users want to use `TTL on Operator` for SQL
jobs, they need to edit the json file which is not supported by general job
submission systems such as flink sql-client, apache kyuubi, apache
streampark and .etc. Users need to download the file and edit it manually,
but they may not have the permissions to the storage system such as HDFS in
a real production environment.

>From this perspective, I think it is necessary to provide a way similar to
hits that users can configure the `TTL on Operator` in their sqls which
help users to use it conveniently. At the same time, I agree with @Shuo's
idea that for complex cases, users can combine hits and `json plan` to
configure `TTL on Operator` better. What do you think? Thanks


Best,
Shammon FY


On Thu, Mar 23, 2023 at 9:58 PM Shuo Cheng  wrote:

> Correction: “users can set 'scan.startup.mode' for kafka connector” ->
> “users
> can set 'scan.startup.mode' for kafka connector by dynamic table option”
>
> Shuo Cheng 于2023年3月23日 周四21:50写道:
>
> > Hi Jane,
> > Thanks for driving this, operator level state ttl is absolutely a desired
> > feature. I would share my opinion as following:
> >
> > If the scope of this proposal is limited as an enhancement for compiled
> > json plan, it makes sense. I think it does not conflict with configuring
> > state ttl
> > in other ways, e.g., SQL HINT or something else, because they just work
> in
> > different level, SQL Hint works in the exact entrance of SQL API, while
> > compiled json plan is the intermediate results for SQL.
> > I think the final shape of state ttl configuring may like the that, users
> > can define operator state ttl using SQL HINT (assumption...), but it may
> > affects more than one stateful operators inside the same query block,
> then
> > users can further configure a specific one by modifying the compiled json
> > plan...
> >
> > In a word, this proposal is in good shape as an enhancement for compiled
> > json plan, and it's orthogonal with other ways like SQL Hint which works
> in
> > a higher level.
> >
> >
> > Nips:
> >
> > > "From the SQL semantic perspective, hints cannot intervene in the
> > calculation of data results."
> > I think it's more properly to say that hint does not affect the
> > equivalence of execution plans (hash agg vs sort agg), not the
> equivalence
> > of execution results, e.g., users can set 'scan.startup.mode' for kafka
> > connector, which also "intervene in the calculation of data results".
> >
> > Sincerely,
> > Shuo
> >
> > On Tue, Mar 21, 2023 at 7:52 PM Jane Chan  wrote:
> >
> >> Hi devs,
> >>
> >> I'd like to start a discussion on FLIP-292: Support configuring state
> TTL
> >> at operator level for Table API & SQL programs [1].
> >>
> >> Currently, we only support job-level state TTL configuration via
> >> 'table.exec.state.ttl'. However, users may expect a fine-grained state
> TTL
> >> control to optimize state usage. Hence we propose to
> serialize/deserialize
> >> the state TTL as metadata of the operator's state to/from the compiled
> >> JSON
> >> plan, to achieve the goal that specifying different state TTL when
> >> transforming the exec node to stateful operators.
> >>
> >> Look forward to your opinions!
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951
> >>
> >> Best Regards,
> >> Jane Chan
> >>
> >
>


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-23 Thread Hongshun Wang
Hi Shammon,

Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
That's interesting.

However, I have a different opinion.

If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
they will be able to find new partitions beyond the specified offset.
Otherwise, enabling auto-discovery is no sense.

When it comes to the TIMESTAMP strategy, it seems to be trivial. I
understand your concern, however, it’s the role of time window rather than
partition discovery. The TIMESTAMP strategy means that the consumer starts
from the first record whose timestamp is greater than or equal to a given
timestamp, rather than only consuming all records whose timestamp is
greater than or equal to the given timestamp. *Thus, even disable auto
discovery or discover new partitions with TIMESTAMP strategy, same problems
still occur.*

Above all , why use EARLIEST strategy? I believe that the strategy
specified by the startup should be the strategy at the moment of startup. *So
there is no difference between new partitions and new messages in old
partitions.* Therefore, all the new partition issues that you care about
will still appear even if you disable the partition, as new messages in old
partitions. If all new messages in old partitions should be consume, all
new messages in old partitions should also be consume.


Best,
Hongshun

On Thu, Mar 23, 2023 at 8:34 PM Shammon FY  wrote:

> Hi Hongshun
>
> Thanks for driving this discussion. Automatically discovering partitions
> without losing data sounds great!
>
> Currently flink supports kafka source with different startup modes, such as
> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>
> If I understand correctly, you will set the offset of new partitions with
> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode
> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
> for kafka in their jobs.
>
> For an extreme example, the current time is 2023-03-23 15:00:00 and users
> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
> is added during this period, jobs will generate “surprising” data. What do
> you think of it?
>
>
> Best,
> Shammon FY
>
>
> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang 
> wrote:
>
> > Hi, Hang,
> >
> > Thanks for your advice.
> >
> > When the second case will occur? Currently, there are three ways to
> specify
> > partitions in Kafka: by topic, by partition, and by matching the topic
> with
> > a regular expression. Currently, if the initial partition number is 0, an
> > error will occur for the first two methods. However, when using a regular
> > expression to match topics, it is allowed to have 0 matched topics.
> >
> > > I don't know when the second case will occur
> >
> >
> > Why prefer the field `firstDiscoveryDone`? When a regular expression
> > initially matches 0 topics, it should consume all messages of the new
> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are
> > used instead of firstDiscoveryDone, any new topics created during (5
> > minutes discovery + job restart time) will be treated as the first
> > discovery, causing data loss.
> >
> > > Then when will we get the empty partition list? I think it should be
> > treated as the first initial discovery if both
> `unassignedInitialPartitons`
> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> >
> > Best
> >
> > Hongshun
> >
> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan 
> wrote:
> >
> > > Hi, Hongshun,
> > >
> > > Thank you for starting this discussion.  I have some problems about the
> > > field `firstDiscoveryDone`.
> > >
> > > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > > Why do we need firstDiscoveryDone? Only relying on the
> > > unAssignedInitialPartitons attribute cannot distinguish between the
> > > following two cases (which often occur in pattern mode):
> > > > The first partition discovery is so slow, before which the checkpoint
> > is
> > > executed and then job is restarted . At this time, the restored
> > > unAssignedInitialPartitons is an empty set, which means non-discovery.
> > The
> > > next discovery will be treated as first discovery.
> > > > The first time the partition is discovered is empty, and new
> partitions
> > > can only be found after multiple partition discoveries. If a restart
> > occurs
> > > between this period, the restored unAssignedInitialPartitons is also an
> > > empty set, which means empty-discovery.The next discovery will be
> treated
> > > as new discovery.
> > >
> > > I don't know when the second case will occur. The partitions must be
> > > greater than 0 when creating topics. And I have read this note in the
> > FLIP.
> > > > Note: The current design only applies to cases where all existing
> > > partitions can be discovered at once. If all old partitions cannot be
> > > discovered at once, the subsequent old partitions discovered will be
> > > treated as new part

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

2023-03-23 Thread Hongshun Wang
"If all new messages in old partitions should be consumed, all new messages
in new partitions should also be consumed."

Sorry, I wrote the last sentence incorrectly.

On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang 
wrote:

> Hi Shammon,
>
> Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
> That's interesting.
>
> However, I have a different opinion.
>
> If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
> they will be able to find new partitions beyond the specified offset.
> Otherwise, enabling auto-discovery is no sense.
>
> When it comes to the TIMESTAMP strategy, it seems to be trivial. I
> understand your concern, however, it’s the role of time window rather than
> partition discovery. The TIMESTAMP strategy means that the consumer starts
> from the first record whose timestamp is greater than or equal to a given
> timestamp, rather than only consuming all records whose timestamp is
> greater than or equal to the given timestamp. *Thus, even disable auto
> discovery or discover new partitions with TIMESTAMP strategy, same problems
> still occur.*
>
> Above all , why use EARLIEST strategy? I believe that the strategy
> specified by the startup should be the strategy at the moment of startup. *So
> there is no difference between new partitions and new messages in old
> partitions.* Therefore, all the new partition issues that you care about
> will still appear even if you disable the partition, as new messages in old
> partitions. If all new messages in old partitions should be consume, all
> new messages in old partitions should also be consume.
>
>
> Best,
> Hongshun
>
> On Thu, Mar 23, 2023 at 8:34 PM Shammon FY  wrote:
>
>> Hi Hongshun
>>
>> Thanks for driving this discussion. Automatically discovering partitions
>> without losing data sounds great!
>>
>> Currently flink supports kafka source with different startup modes, such
>> as
>> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>>
>> If I understand correctly, you will set the offset of new partitions with
>> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup
>> mode
>> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
>> for kafka in their jobs.
>>
>> For an extreme example, the current time is 2023-03-23 15:00:00 and users
>> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
>> is added during this period, jobs will generate “surprising” data. What do
>> you think of it?
>>
>>
>> Best,
>> Shammon FY
>>
>>
>> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang 
>> wrote:
>>
>> > Hi, Hang,
>> >
>> > Thanks for your advice.
>> >
>> > When the second case will occur? Currently, there are three ways to
>> specify
>> > partitions in Kafka: by topic, by partition, and by matching the topic
>> with
>> > a regular expression. Currently, if the initial partition number is 0,
>> an
>> > error will occur for the first two methods. However, when using a
>> regular
>> > expression to match topics, it is allowed to have 0 matched topics.
>> >
>> > > I don't know when the second case will occur
>> >
>> >
>> > Why prefer the field `firstDiscoveryDone`? When a regular expression
>> > initially matches 0 topics, it should consume all messages of the new
>> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions
>> are
>> > used instead of firstDiscoveryDone, any new topics created during (5
>> > minutes discovery + job restart time) will be treated as the first
>> > discovery, causing data loss.
>> >
>> > > Then when will we get the empty partition list? I think it should be
>> > treated as the first initial discovery if both
>> `unassignedInitialPartitons`
>> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
>> >
>> > Best
>> >
>> > Hongshun
>> >
>> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan 
>> wrote:
>> >
>> > > Hi, Hongshun,
>> > >
>> > > Thank you for starting this discussion.  I have some problems about
>> the
>> > > field `firstDiscoveryDone`.
>> > >
>> > > In the FLIP, why we need firstDiscoveryDone is as follows.
>> > > > Why do we need firstDiscoveryDone? Only relying on the
>> > > unAssignedInitialPartitons attribute cannot distinguish between the
>> > > following two cases (which often occur in pattern mode):
>> > > > The first partition discovery is so slow, before which the
>> checkpoint
>> > is
>> > > executed and then job is restarted . At this time, the restored
>> > > unAssignedInitialPartitons is an empty set, which means non-discovery.
>> > The
>> > > next discovery will be treated as first discovery.
>> > > > The first time the partition is discovered is empty, and new
>> partitions
>> > > can only be found after multiple partition discoveries. If a restart
>> > occurs
>> > > between this period, the restored unAssignedInitialPartitons is also
>> an
>> > > empty set, which means empty-discovery.The next discovery will be
>> treated
>> > > as new discovery.
>> > >
>> > > I don't know 

Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-03-23 Thread yuxia
Thanks all for your feedback.

@Shammon FY
My gut feeling is that the end user shouldn't care about whether it'll delete 
direcotry or move to Trash directory with the TRUNCATE TABLE statement. They 
only need to know it will delete all rows from a table.
To me, I think delete directory or move to trash is more likely to be a 
behavior of external storage level instead of SQL statement level. In Hive, if 
user configure Trash, it will then move files to trash for DROP statment.
Also, hardly did I see such usage with TRUNCATE TABLE statement in other 
engines. What's more, to support it, we have to extend the TRUNCATE TABLE synax 
which won't then compliant with SQL standard. I really don't want to do that 
and I believe it'll make user confused if we do so.

@Hang
`TRUNCATE TABLE` is meant to delete all rows of a base table. So, it makes no 
sense that table source implements it.
If user use TRUNCATE TABLE statement to truncate a table, the planner will only 
try to
find the DynamicTableSink for the corresponding table. 

@Ran Tao
1: Thanks for you reminder. I said it won't support view in the FLIP, but 
forget to said temporary table is also not supported. Now, I add this part to 
this FLIP.

2: Yes, I also considered to incldue it in this FLIP before. But as far as I 
see, I haven't seen much usage of truncate table with partition. It's not as 
useful as truncate table. So, I tend to keep this FLIP simple in here without 
supporting truncate table with partition.
Also, seems for `truncate table with partition`, differnet engines may have 
differernt syntax;
Hive[1]/Spark[2] use the following syntax:
TRUNCATE TABLE table_name [PARTITION partition_spec]

SqlServer[3] use the follwoing syntax:
TRUNCATE TABLE { database_name.schema_name.table_name | schema_name.table_name 
| table_name } [ WITH ( PARTITIONS ( {  |  }
So, I'm tend to be cautious about it.

But I'm open to this. If there's any feedback or strong requirement, I don't 
mind to add it in this FLIP.
If we do need it in some day, I can propose it in a new FLIP. It won't break 
the current design.

As for concrete syntax in the FLIP, I think the current one is the concrete 
syntax, we don't allow TABLE keyword to be optional.

3: Thanks for your reminder, I have updadted the FLIP for this.


[1]https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-TruncateTable
[2]https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-truncate-table.html
[3]https://learn.microsoft.com/en-us/sql/t-sql/statements/truncate-table-transact-sql?view=sql-server-ver16



Best regards,
Yuxia

- 原始邮件 -
发件人: "Ran Tao" 
收件人: "dev" 
发送时间: 星期四, 2023年 3 月 23日 下午 6:28:17
主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

Hi, yuxia.

Thanks for starting the discussion.
I think it's a nice improvement to support TRUNCATE TABLE statement because
many other mature engines supports it.

I have some questions.
1. because table has different types, whether we will support view or
temporary tables?

2. some other engines such as spark and hive support TRUNCATE TABLE with
partition. whether we will support?
btw, i think you need give the TRUNCATE TABLE concrete syntax in the FLIP
because some engines has different syntaxes.
for example, hive allow TRUNCATE TABLE be TRUNCATE [TABLE] which means
TABLE keyword can be optional.

3. The Proposed Changes try to use SqlToOperationConverter and run in
TableEnvironmentImpl#executeInternal.
I think it's out of date, the community is refactoring the conversion logic
from SqlNode to operation[1] and executions in TableEnvironmentImpl[2].
I suggest you can use new way to support it.

[1] https://issues.apache.org/jira/browse/FLINK-31464
[2] https://issues.apache.org/jira/browse/FLINK-31368

Best Regards,
Ran Tao
https://github.com/chucheng92


yuxia  于2023年3月22日周三 21:13写道:

> Hi, devs.
>
> I'd like to start a discussion about FLIP-302: Support TRUNCATE TABLE
> statement [1].
>
> The TRUNCATE TABLE statement is a SQL command that allows users to quickly
> and efficiently delete all rows from a table without dropping the table
> itself. This statement is commonly used in data warehouse, where large data
> sets are frequently loaded and unloaded from tables.
> So, this FLIP is meant to support TRUNCATE TABLE statement. M ore exactly,
> this FLIP will bring Flink the TRUNCATE TABLE syntax and an interface with
> which the coresponding connectors can implement their own logic for
> truncating table.
>
> Looking forwards to your feedback.
>
> [1]: [
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> |
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
> ]
>
>
> Best regards,
> Yuxia
>


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Jane Chan
Hi Shammon and Shuo,

Thanks for your valuable comments!

Some thoughts:

@Shuo
> I think it's more properly to say that hint does not affect the
equivalenceof execution plans (hash agg vs sort agg), not the equivalence
of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector by
dynamic table option, which
also "intervene in the calculation of data results".

IMO, the statement that "hint should not interfere with the calculation
results", means it should not interfere with internal computation. On the
other hand, 'scan.startup.mode' interferes with the ingestion of the data.
I think these two concepts are different, but of course, this is just my
opinion and welcome other views.

> I think the final shape of state ttl configuring may like the that,
userscan define operator state ttl using SQL HINT (assumption...), but it
may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

Setting aside the issue of semantics, setting TTL from a higher level seems
to be attractive. This means that users only need to configure
'table.exec.state.ttl' through the existing hint syntax to achieve the
effect. Everything is a familiar formula. But is it really the case? Hints
apply to a very broad range. Let me give an example.

Suppose a user wants to set different TTLs for the two streams in a stream
join query. Where should the hints be written?

-- the original query before configuring state TTL
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 1: declaring hints at the very beginning of the table scan

-- should he or she write hints when declaring the first temporary view?
create temporary view view1 as select  from my_table_1
/*+(OPTIONS('table.exec.state.ttl'
= 'foo'))*/;
create temporary view view2 as select  from my_table_2
/*+(OPTIONS('table.exec.state.ttl'
= 'bar'))*/;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 2: declaring hints when performing the join

-- or should he or she write hints when declaring the join temporary view?
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' =
'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b
on a.join_key = b.join_key;

>From the user's point of view, does he or she needs to care about the
difference between these two kinds of style? Users might think the two may
be equivalent; but in reality, as developers, how do we define the range in
which hint starts and ends to take effect?

Consider the following two assumptions

1. Assuming the hint takes effect from the moment it is declared and
applies to any subsequent stateful operators until it is overridden by a
new hint.
If this is the assumption, it's clear that Option 1 and Option 2 are
different because a ChangelogNormalize node can appear between scan and
join. Meanwhile, which stream's TTL to apply to the following query after
the stream join? It is unclear if the user does not explicitly set it.
Should the engine make a random decision?

2. Assuming that the scope of the hint only applies to the current query
block and does not extend to the next operator.
In this case, the first way of setting the hint will not work because it
cannot be brought to the join operator. Users must choose the second way to
configure. Are users willing to remember this strange constraint on SQL
writing style? Does this indicate a new learning cost?

The example above is used to illustrate that while this approach may seem
simple and direct, it actually has many limitations and may produce
unexpected behavior. Will users still find it attractive? IMO *hints only
work for a very limited situation where the query is very simple, and its
scope is more coarse and not operator-level*. Maybe it deserves another
FLIP to discuss whether we need a multiple-level state TTL configuration
mechanism and how to properly implement it.

@Shammon
> Generally, Flink jobs support two types
of submission: SQL and jar. If users want to use `TTL on Operator` for SQL
jobs, they need to edit the json file which is not supported by general job
submission systems such as flink sql-client, apache kyuubi, apache
streampark and .etc. Users need to download the file and edit it manually,
but they may not have the permissions to the storage system such as HDFS in
a real production environment. From this perspective, I think it is
necessary to provide a way similar to
hits that users can configure the `TTL on Operator` in their sqls which
help users to u

Re: [VOTE] Release flink-connector-mongodb, release candidate #1

2023-03-23 Thread Jiabao Sun
Thanks Leonard for the suggestion.

- Check that all POM files point to the same version
- Build the source with Maven
- Ran end-to-end tests locally and succeeded  
- Tested read from and write to MongoDB by local cluster.
- Tested lookup table feature by local cluster.


Tested read from and write into MongoDB with steps comes up.

-- prepare test data of MongoDB
> use test;
> db.users.insertMany([
  { "user_id": NumberLong(100),  "user_name": "Bob", "region": "Beijing" },
  { "user_id": NumberLong(101),  "user_name": "Alice", "region": "Shanghai" },
  { "user_id": NumberLong(102),  "user_name": "Greg", "region": "Berlin" },
  { "user_id": NumberLong(103),  "user_name": "Richard", "region": "Berlin" }
]);
 
-- register a MongoDB source which interpret MongoDB collection as a 
append-only stream
CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://username:password@localhost:27017',
  'database' = 'test',
  'collection' = 'users'
);
> SELECT * FROM users;
 
+-+---+--+
| user_id | user_name |   region |
+-+---+--+
| 100 |  Bob  | Beijing  |
| 101 |  Alice| Shanghai |
| 102 |  Greg | Berlin   |
| 103 |  Richard  | Berlin   |
+-+---+--+


-- register an MongoDB sink which will be used for storing latest users 
information
CREATE TABLE users_snapshot (
  user_id BIGINT,
  user_name STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://172.31.58.86:27017',
  'database' = 'test',
  'collection' = 'users_snapshot'
);
 
> INSERT INTO users_snapshot SELECT * FROM users;
> SELECT * FROM users_snapshot;
 
+-++---+
| user_id | user_name |   region |
+-+-+--+
| 100 |  Bob  | Beijing  |
| 101 |  Alice| Shanghai |
| 102 |  Greg | Berlin   |
| 103 |  Richard  | Berlin   |
+-+---+--+


---

Tested lookup table with steps comes up.

-- prepare test data of MongoDB
> use test;
> db.pageviews.insertMany([
  { "user_id": NumberLong(100),  "page_id": NumberLong(10001), viewtime: 
Timestamp(1601510460, 0) },
  { "user_id": NumberLong(102),  "page_id": NumberLong(10002), viewtime: 
Timestamp(1601510520, 0) },
  { "user_id": NumberLong(101),  "page_id": NumberLong(10002), viewtime: 
Timestamp(1601510640, 0) },
  { "user_id": NumberLong(102),  "page_id": NumberLong(10004), viewtime: 
Timestamp(1601510760, 0) },
  { "user_id": NumberLong(102),  "page_id": NumberLong(10003), viewtime: 
Timestamp(1601510820, 0) }
]);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP_LTZ(0),
  proctime AS PROCTIME()
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://username:password@localhost:27017',
  'database' = 'test',
  'collection' = 'pageviews'
);

> SET table.local-time-zone = UTC;
> SELECT * FROM pageviews;
 
+-+--+---+--+
| user_id |   page_id | viewtime | proctime |
+-+--+---+--+
| 100 |  10001| 2020-10-01 00:01:00  |  |
| 102 |  10002| 2020-10-01 00:02:00  |  |
| 101 |  10002| 2020-10-01 00:04:00  |  |
| 102 |  10004| 2020-10-01 00:06:00  |  |
| 102 |  10003| 2020-10-01 00:07:00  |  |
+-+---+---+---+

CREATE TABLE pageviews_enriched (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP_LTZ(0),
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector' = 'mongodb',
  'uri' = 'mongodb://username:password@localhost:27017',
  'database' = 'test',
  'collection' = 'pageviews_enriched'
);

INSERT INTO pageviews_enriched
SELECT p.user_id,
   p.page_id,
   p.viewtime,
   u.region
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;
 
> SELECT * FROM pageviews_enriched;

+-+--+--++
| user_id |   page_id |viewtime |  user_region |
+-+--+--++
| 100 |  10001| 2020-10-01 00:01:00  |Beijing|   
| 102 |  10002| 2020-10-01 00:02:00  |Berlin |  
| 101 |  10002| 2020-10-01 00:04:00  |Shanghai |
| 102 |  10004| 2020-10-01 00:06:00  |Berlin |  
| 102 |  10003| 2020-10-01 00:07:00  |Berlin |
+-+---+--+---+
 
Best,
Jiabao



> 2023年3月20日 下午4:38,Leonard Xu  写道:
> 
> 
>> + 1 (non-binding)
>> 
>> Best,
>> Jiaba
> 
> Hi, Jiabao
> 
> Thanks for help to verify the release candidate. But  we need to do the 

[jira] [Created] (FLINK-31602) Add ARRAY_POSITION supported in SQL & Table API

2023-03-23 Thread jackylau (Jira)
jackylau created FLINK-31602:


 Summary: Add ARRAY_POSITION supported in SQL & Table API
 Key: FLINK-31602
 URL: https://issues.apache.org/jira/browse/FLINK-31602
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: jackylau
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?

2023-03-23 Thread Elkhan Dadashov
Checking with the community again, if anyone explored this before.

Thanks.


On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov 
wrote:

> Dear Flink developers,
>
> Wanted to check, if there is a way to control the parallelism of
> auto-generated Flink operators of the FlinkSQL job graph?
>
> In Java API, it is possible to have full control of the parallelism of
> each operator.
>
> On FlinkSQL some source and sink connectors support `source.parallelism`
> and `sink.parallelism`, and the rest can be set via `default.parallelism`.
>
> In this particular scenario, enchancedEvents gets chained to the
> KafkaSource operator, it can be separated by calling disableChain() on
> KafkaSource  stream on Kafka connector side, but even with disabled
> chaining on the source stream, `enhancedEvents` operator parallelism is
> still set to 5 (same as Kafka Source operator parallelism), instead of 3
> (which is default parallelism) :
>
> ```sql
> SET 'parallelism.default' = '3';
>
> CREATE TABLE input_kafka_table
> (
> ...
> ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
> WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
> ) WITH (
> 'connector' = 'kafka',
> 'source.parallelism' = '5' // this is supported by cutomization of
> kafka connector
> ...
> );
>
> CREATE TEMPORARY VIEW enhancedEvents AS (
>  SELECT x, y
>  FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
> );
>
> CREATE TABLE other_table_source (...) WITH(...);
> CREATE TABLE other_table_sink (...) WITH(...);
>
> BEGIN STATEMENT SET;
>  INSERT into enhancedEventsSink (Select * from enhancedEvents);
>  INSERT into other_table_sink (Select z from other_table_source );
> END;
> ```
>
> Is there a way to force override parallelism of auto-generated operators
> for FlinkSQL pipeline?
>
> Or is this expected behavior of some operator's parallelism not assigned
> from default parallelism but from another operator's parallelism?
>
> Want to understand if this is a bug or intended behavior.
>
> Thank you.
>
>


[jira] [Created] (FLINK-31603) Line break should be removed in create table with-clauses, load module with-clauses and table hints for both keys and values

2023-03-23 Thread Yao Zhang (Jira)
Yao Zhang created FLINK-31603:
-

 Summary: Line break should be removed in create table 
with-clauses, load module with-clauses and table hints for both keys and values
 Key: FLINK-31603
 URL: https://issues.apache.org/jira/browse/FLINK-31603
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.16.1, 1.16.0
 Environment: Flink 1.16.0
Reporter: Yao Zhang
 Fix For: 1.18.0


Given a SQL like this:
{code:sql}
CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
  'connector' = 'kaf
ka'
  ...
);
{code}
After parsing the SQL, the option value 'connector' is 'kaf\nka', which will 
lead to problems.

The line break inside keys/values in with-clauses and table hints should be 
removed when parsing SQLs.

If this is the issue that needs to fix, I would like to do it, as I am 
currently working on it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-23 Thread Yun Tang
Hi,

From my point of view, I am a bit against using SQL hint to set state TTL as 
FlinkSQL could be translated to several stateful operators. If we want to let 
different state could have different TTL configs within one operator, the SQL 
hint solution could not work. A better way is to allow a graphical IDE to 
display the stateful operators and let users configure them. And the IDE 
submits the json plan to Flink to run jobs.

For the details of the structure of ExecNodes, since the state name is unique 
in the underlying state layer, shall we introduce the "index" tag to identify 
the state config?
What will happen with the conditions below:
1st run:
   {
 "index": 0,
 "ttl": "25920 ms",
 "name": "join-lef-state"
   },
   {
 "index": 1,
 "ttl": "8640 ms",
 "name": "join-right-state"
   }

2nd run:
   {
 "index": 0,
 "ttl": "8640 ms",
 "name": "join-right-state"
   },
   {
 "index": 1,
 "ttl": "25920 ms",
 "name": "join-lef-state"
   }

Best
Yun Tang

From: Jane Chan 
Sent: Friday, March 24, 2023 11:57
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator 
level for Table API & SQL programs

Hi Shammon and Shuo,

Thanks for your valuable comments!

Some thoughts:

@Shuo
> I think it's more properly to say that hint does not affect the
equivalenceof execution plans (hash agg vs sort agg), not the equivalence
of execution
results, e.g., users can set 'scan.startup.mode' for kafka connector by
dynamic table option, which
also "intervene in the calculation of data results".

IMO, the statement that "hint should not interfere with the calculation
results", means it should not interfere with internal computation. On the
other hand, 'scan.startup.mode' interferes with the ingestion of the data.
I think these two concepts are different, but of course, this is just my
opinion and welcome other views.

> I think the final shape of state ttl configuring may like the that,
userscan define operator state ttl using SQL HINT (assumption...), but it
may
affects more than one stateful operators inside the same query block, then
users can further configure a specific one by modifying the compiled json
plan...

Setting aside the issue of semantics, setting TTL from a higher level seems
to be attractive. This means that users only need to configure
'table.exec.state.ttl' through the existing hint syntax to achieve the
effect. Everything is a familiar formula. But is it really the case? Hints
apply to a very broad range. Let me give an example.

Suppose a user wants to set different TTLs for the two streams in a stream
join query. Where should the hints be written?

-- the original query before configuring state TTL
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 1: declaring hints at the very beginning of the table scan

-- should he or she write hints when declaring the first temporary view?
create temporary view view1 as select  from my_table_1
/*+(OPTIONS('table.exec.state.ttl'
= 'foo'))*/;
create temporary view view2 as select  from my_table_2
/*+(OPTIONS('table.exec.state.ttl'
= 'bar'))*/;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 a join my_view_2 b on a.join_key =
b.join_key;

Option 2: declaring hints when performing the join

-- or should he or she write hints when declaring the join temporary view?
create temporary view view1 as select  from my_table_1;
create temporary view view2 as select  from my_table_2;
create temporary view joined_view as
select view1.*, view2.* from my_view_1 /*+(OPTIONS('table.exec.state.ttl' =
'foo'))*/ a join my_view_2 /*+(OPTIONS('table.exec.state.ttl' = 'bar'))*/ b
on a.join_key = b.join_key;

From the user's point of view, does he or she needs to care about the
difference between these two kinds of style? Users might think the two may
be equivalent; but in reality, as developers, how do we define the range in
which hint starts and ends to take effect?

Consider the following two assumptions

1. Assuming the hint takes effect from the moment it is declared and
applies to any subsequent stateful operators until it is overridden by a
new hint.
If this is the assumption, it's clear that Option 1 and Option 2 are
different because a ChangelogNormalize node can appear between scan and
join. Meanwhile, which stream's TTL to apply to the following query after
the stream join? It is unclear if the user does not explicitly set it.
Should the engine make a random decision?

2. Assuming that the scope of the hint only applies to the current query
block and does not extend to the next operator.
In this case, the first way of setting the hint will not work because it
cannot be brought to the join operator. Us