Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID

2023-05-23 Thread Lijie Wang
Hi Joao,

I noticed the FLIP currently contains the following 2 methods about type
serializer:

(1)  TypeSerializer createInputSerializer();
(2)  TypeSerializer createSerializer(TypeInformation inType);

Is the method (2) still needed now?

Best,
Lijie

João Boto  于2023年5月19日周五 16:53写道:

> Updated the FLIP to use this option.
>


[jira] [Created] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException

2023-05-23 Thread Jira
Michał Fijołek created FLINK-32160:
--

 Summary: CompactOperator cannot continue from checkpoint because 
of java.util.NoSuchElementException
 Key: FLINK-32160
 URL: https://issues.apache.org/jira/browse/FLINK-32160
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.0, 1.16.0
 Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3
Reporter: Michał Fijołek


Hello :) We have a flink job (v 1.17) on k8s (using official 
flink-k8s-operator) that reads data from kafka and writes it to s3 using 
flink-sql using compaction. Job sometimes fails and continues from checkpoint 
just fine, but once a couple of days we experience a crash loop. Job cannot 
continue from the latest checkpoint and fails with such exception:
{noformat}
java.util.NoSuchElementException at 
java.base/java.util.ArrayList$Itr.next(Unknown Source)
 at 
org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
 at java.base/java.lang.Thread.run(Unknown Source){noformat}
Here’s the relevant code: 
[https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]

It looks like `CompactOperator` is calling `next()` on iterator without 
checking `hasNext()` first - why's that? Is it a bug? Why 
`context.getOperatorStateStore().getListState(metaDescriptor)` returns empty 
iterator? Is latest checkpoint broken in such case? 
We have an identical job, but without compaction, and it works smoothly for a 
couple of weeks now. 

The whole job is just `select` from kafka and `insert` to s3.
{noformat}
CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` (  `foo_bar1` STRING,
  `foo_bar2` STRING,
  `foo_bar3` STRING,
  `foo_bar4` STRING
  )
  PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
  STORED AS parquet
  LOCATION 's3a://my/bucket/'
  TBLPROPERTIES (
'auto-compaction' = 'true',
'compaction.file-size' = '128MB',
'sink.parallelism' = '8',
'format' = 'parquet',
'parquet.compression' = 'SNAPPY',
'sink.rolling-policy.rollover-interval' = '1 h',
'sink.partition-commit.policy.kind' = 'metastore'
  ){noformat}
Checkpoint configuration:
{noformat}
Checkpointing Mode Exactly Once
Checkpoint Storage FileSystemCheckpointStorage
State Backend HashMapStateBackend
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Unaligned Checkpoints Disabled
Persist Checkpoints Externally Enabled (retain on cancellation)
Tolerable Failed Checkpoints 0
Checkpoints With Finished Tasks Enabled
State Changelog Disabled{noformat}

Is there something wrong with given config or is this some unhandled edge case? 

Currently our workaround is to restart a job, without using checkpoint - it 
uses a state from kafka which in this case is fine



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release 1.16.2, release candidate #1

2023-05-23 Thread Qingsheng Ren
+1 (binding)

- Verified checksums and signatures
- Verified that no binary exist in the source archive
- Build the source with Maven
- Verified that README doesn't have unexpected contents
- Verified that the commit ID on Flink UI matches with the tag
- Run a job with SQL client that reads from a Kafka topic then writes to
another one. Created a savepoint for the job and recovered from savepoint
successfully
- Reviewed release announcement PR

Best regards,
Qingsheng

On Mon, May 22, 2023 at 8:34 PM Xingbo Huang  wrote:

> +1 (binding)
>
> - verify signatures and checksums
> - verify python wheel package contents
> - pip install apache-flink-libraries and apache-flink wheel packages
> - run example flink/flink-python/pyflink/examples/table/basic_operations.py
> with Python 3.7
> - reviewed release blog post
>
> Best,
> Xingbo
>
> Xintong Song  于2023年5月21日周日 13:03写道:
>
> > +1 (binding)
> >
> > - verified signatures and checksums
> > - built from source
> > - tried example jobs with a standalone cluster, everything seems fine
> > - review release announcement PR
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Sat, May 20, 2023 at 6:04 PM Jing Ge 
> > wrote:
> >
> > > +1(non-binding)
> > >
> > > - reviewed Jira release notes
> > > - built from source
> > > - apache repos contain all necessary files
> > > - verified signatures
> > > - verified hashes
> > > - verified tag
> > > - reviewed PR
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Sat, May 20, 2023 at 11:51 AM Yun Tang  wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > >
> > > >   *   Verified signatures
> > > >   *   Reviewed the flink-web PR
> > > >   *   Set up a standalone cluster from released binaries and check
> the
> > > git
> > > > revision number.
> > > >   *   Submit the statemachine example with RocksDB, and it works
> fine.
> > > >
> > > > Best,
> > > > Yun Tang
> > > > 
> > > > From: Yuxin Tan 
> > > > Sent: Friday, May 19, 2023 17:41
> > > > To: dev@flink.apache.org 
> > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1
> > > >
> > > > +1 (non-binding)
> > > >
> > > > - Verified signature
> > > > - Verified hashes
> > > > - Build form source with mac
> > > > - Verify that the source archives do not contain any binaries
> > > > - Run streaming and batch job in sql-client successfully.
> > > >
> > > > Thanks weijie for driving this release candidate.
> > > >
> > > > Best,
> > > > Yuxin
> > > >
> > > >
> > > > weijie guo  于2023年5月19日周五 16:19写道:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > >
> > > > > Please review and vote on the release candidate #1 for the version
> > > > 1.16.2,
> > > > >
> > > > > as follows:
> > > > >
> > > > >
> > > > > [ ] +1, Approve the release
> > > > >
> > > > > [ ] -1, Do not approve the release (please provide specific
> comments)
> > > > >
> > > > >
> > > > > The complete staging area is available for your review, which
> > includes:
> > > > >
> > > > > * JIRA release notes [1],
> > > > >
> > > > > * the official Apache source release and binary convenience
> releases
> > to
> > > > be
> > > > >
> > > > > deployed to dist.apache.org [2], which are signed with the key
> with
> > > > >
> > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3],
> > > > >
> > > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > >
> > > > > * source code tag "release-1.16.2-rc1" [5],
> > > > >
> > > > > * website pull request listing the new release and adding
> > announcement
> > > > blog
> > > > >
> > > > > post [6].
> > > > >
> > > > >
> > > > > The vote will be open for at least 72 hours. It is adopted by
> > majority
> > > > >
> > > > > approval, with at least 3 PMC affirmative votes.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Release Manager
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765
> > > > >
> > > > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.16.2-rc1/
> > > > >
> > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > >
> > > > > [4]
> > > > >
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1634/
> > > > >
> > > > > [5]
> https://github.com/apache/flink/releases/tag/release-1.16.2-rc1
> > > > >
> > > > > [6] https://github.com/apache/flink-web/pull/649
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] Release 1.16.2, release candidate #1

2023-05-23 Thread Samrat Deb
+1 (non-binding)

- Verified signature
- Verified hashes
- Build form source with mac
- Verify that the source archives do not contain any binaries
- Run streaming and batch job in sql-client successfully.

Thank you for driving this release candidate.

On Tue, 23 May 2023 at 1:51 PM, Qingsheng Ren  wrote:

> +1 (binding)
>
> - Verified checksums and signatures
> - Verified that no binary exist in the source archive
> - Build the source with Maven
> - Verified that README doesn't have unexpected contents
> - Verified that the commit ID on Flink UI matches with the tag
> - Run a job with SQL client that reads from a Kafka topic then writes to
> another one. Created a savepoint for the job and recovered from savepoint
> successfully
> - Reviewed release announcement PR
>
> Best regards,
> Qingsheng
>
> On Mon, May 22, 2023 at 8:34 PM Xingbo Huang  wrote:
>
> > +1 (binding)
> >
> > - verify signatures and checksums
> > - verify python wheel package contents
> > - pip install apache-flink-libraries and apache-flink wheel packages
> > - run example
> flink/flink-python/pyflink/examples/table/basic_operations.py
> > with Python 3.7
> > - reviewed release blog post
> >
> > Best,
> > Xingbo
> >
> > Xintong Song  于2023年5月21日周日 13:03写道:
> >
> > > +1 (binding)
> > >
> > > - verified signatures and checksums
> > > - built from source
> > > - tried example jobs with a standalone cluster, everything seems fine
> > > - review release announcement PR
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Sat, May 20, 2023 at 6:04 PM Jing Ge 
> > > wrote:
> > >
> > > > +1(non-binding)
> > > >
> > > > - reviewed Jira release notes
> > > > - built from source
> > > > - apache repos contain all necessary files
> > > > - verified signatures
> > > > - verified hashes
> > > > - verified tag
> > > > - reviewed PR
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Sat, May 20, 2023 at 11:51 AM Yun Tang  wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > >
> > > > >   *   Verified signatures
> > > > >   *   Reviewed the flink-web PR
> > > > >   *   Set up a standalone cluster from released binaries and check
> > the
> > > > git
> > > > > revision number.
> > > > >   *   Submit the statemachine example with RocksDB, and it works
> > fine.
> > > > >
> > > > > Best,
> > > > > Yun Tang
> > > > > 
> > > > > From: Yuxin Tan 
> > > > > Sent: Friday, May 19, 2023 17:41
> > > > > To: dev@flink.apache.org 
> > > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - Verified signature
> > > > > - Verified hashes
> > > > > - Build form source with mac
> > > > > - Verify that the source archives do not contain any binaries
> > > > > - Run streaming and batch job in sql-client successfully.
> > > > >
> > > > > Thanks weijie for driving this release candidate.
> > > > >
> > > > > Best,
> > > > > Yuxin
> > > > >
> > > > >
> > > > > weijie guo  于2023年5月19日周五 16:19写道:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > >
> > > > > > Please review and vote on the release candidate #1 for the
> version
> > > > > 1.16.2,
> > > > > >
> > > > > > as follows:
> > > > > >
> > > > > >
> > > > > > [ ] +1, Approve the release
> > > > > >
> > > > > > [ ] -1, Do not approve the release (please provide specific
> > comments)
> > > > > >
> > > > > >
> > > > > > The complete staging area is available for your review, which
> > > includes:
> > > > > >
> > > > > > * JIRA release notes [1],
> > > > > >
> > > > > > * the official Apache source release and binary convenience
> > releases
> > > to
> > > > > be
> > > > > >
> > > > > > deployed to dist.apache.org [2], which are signed with the key
> > with
> > > > > >
> > > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3],
> > > > > >
> > > > > > * all artifacts to be deployed to the Maven Central Repository
> [4],
> > > > > >
> > > > > > * source code tag "release-1.16.2-rc1" [5],
> > > > > >
> > > > > > * website pull request listing the new release and adding
> > > announcement
> > > > > blog
> > > > > >
> > > > > > post [6].
> > > > > >
> > > > > >
> > > > > > The vote will be open for at least 72 hours. It is adopted by
> > > majority
> > > > > >
> > > > > > approval, with at least 3 PMC affirmative votes.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Release Manager
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765
> > > > > >
> > > > > > [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-1.16.2-rc1/
> > > > > >
> > > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > > >
> > > > > > [4]
> > > > > >
> > > >
> > https://repository.apache.org/content/repositories/orgapacheflink-1634/
> > > > > >
> > > > > > [5]
> > https://github.com/apache/flink/releases/tag/release-1.16.2-rc1
> >

Re: [VOTE] Release 1.16.2, release candidate #1

2023-05-23 Thread Hang Ruan
+1(non-binding)

- reviewed Jira release notes
- built from source
- verified signatures
- verified hashes
- verified that no binary exist in the source archive
- run streaming job in sql-client successfully

Best,
Hang

Samrat Deb  于2023年5月23日周二 16:23写道:

> +1 (non-binding)
>
> - Verified signature
> - Verified hashes
> - Build form source with mac
> - Verify that the source archives do not contain any binaries
> - Run streaming and batch job in sql-client successfully.
>
> Thank you for driving this release candidate.
>
> On Tue, 23 May 2023 at 1:51 PM, Qingsheng Ren  wrote:
>
> > +1 (binding)
> >
> > - Verified checksums and signatures
> > - Verified that no binary exist in the source archive
> > - Build the source with Maven
> > - Verified that README doesn't have unexpected contents
> > - Verified that the commit ID on Flink UI matches with the tag
> > - Run a job with SQL client that reads from a Kafka topic then writes to
> > another one. Created a savepoint for the job and recovered from savepoint
> > successfully
> > - Reviewed release announcement PR
> >
> > Best regards,
> > Qingsheng
> >
> > On Mon, May 22, 2023 at 8:34 PM Xingbo Huang  wrote:
> >
> > > +1 (binding)
> > >
> > > - verify signatures and checksums
> > > - verify python wheel package contents
> > > - pip install apache-flink-libraries and apache-flink wheel packages
> > > - run example
> > flink/flink-python/pyflink/examples/table/basic_operations.py
> > > with Python 3.7
> > > - reviewed release blog post
> > >
> > > Best,
> > > Xingbo
> > >
> > > Xintong Song  于2023年5月21日周日 13:03写道:
> > >
> > > > +1 (binding)
> > > >
> > > > - verified signatures and checksums
> > > > - built from source
> > > > - tried example jobs with a standalone cluster, everything seems fine
> > > > - review release announcement PR
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Sat, May 20, 2023 at 6:04 PM Jing Ge 
> > > > wrote:
> > > >
> > > > > +1(non-binding)
> > > > >
> > > > > - reviewed Jira release notes
> > > > > - built from source
> > > > > - apache repos contain all necessary files
> > > > > - verified signatures
> > > > > - verified hashes
> > > > > - verified tag
> > > > > - reviewed PR
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Sat, May 20, 2023 at 11:51 AM Yun Tang 
> wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > >
> > > > > >   *   Verified signatures
> > > > > >   *   Reviewed the flink-web PR
> > > > > >   *   Set up a standalone cluster from released binaries and
> check
> > > the
> > > > > git
> > > > > > revision number.
> > > > > >   *   Submit the statemachine example with RocksDB, and it works
> > > fine.
> > > > > >
> > > > > > Best,
> > > > > > Yun Tang
> > > > > > 
> > > > > > From: Yuxin Tan 
> > > > > > Sent: Friday, May 19, 2023 17:41
> > > > > > To: dev@flink.apache.org 
> > > > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > - Verified signature
> > > > > > - Verified hashes
> > > > > > - Build form source with mac
> > > > > > - Verify that the source archives do not contain any binaries
> > > > > > - Run streaming and batch job in sql-client successfully.
> > > > > >
> > > > > > Thanks weijie for driving this release candidate.
> > > > > >
> > > > > > Best,
> > > > > > Yuxin
> > > > > >
> > > > > >
> > > > > > weijie guo  于2023年5月19日周五 16:19写道:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > >
> > > > > > > Please review and vote on the release candidate #1 for the
> > version
> > > > > > 1.16.2,
> > > > > > >
> > > > > > > as follows:
> > > > > > >
> > > > > > >
> > > > > > > [ ] +1, Approve the release
> > > > > > >
> > > > > > > [ ] -1, Do not approve the release (please provide specific
> > > comments)
> > > > > > >
> > > > > > >
> > > > > > > The complete staging area is available for your review, which
> > > > includes:
> > > > > > >
> > > > > > > * JIRA release notes [1],
> > > > > > >
> > > > > > > * the official Apache source release and binary convenience
> > > releases
> > > > to
> > > > > > be
> > > > > > >
> > > > > > > deployed to dist.apache.org [2], which are signed with the key
> > > with
> > > > > > >
> > > > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3],
> > > > > > >
> > > > > > > * all artifacts to be deployed to the Maven Central Repository
> > [4],
> > > > > > >
> > > > > > > * source code tag "release-1.16.2-rc1" [5],
> > > > > > >
> > > > > > > * website pull request listing the new release and adding
> > > > announcement
> > > > > > blog
> > > > > > >
> > > > > > > post [6].
> > > > > > >
> > > > > > >
> > > > > > > The vote will be open for at least 72 hours. It is adopted by
> > > > majority
> > > > > > >
> > > > > > > approval, with at least 3 PMC affirmative votes.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Release Manager
> > > > > > 

[DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-23 Thread Aitozi
Hi guys,
I want to bring up a discussion about adding support of User
Defined AsyncTableFunction in Flink.
Currently, async table function are special functions for table source
to perform
async lookup. However, it's worth to support the user defined async
table function.
Because, in this way, the end SQL user can leverage it to perform the
async operation
which is useful to maximum the system throughput especially for IO
bottleneck case.

You can find some more detail in [1].

Looking forward to feedback


[1]: 
https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-23 Thread Maximilian Michels
Niceee. Thanks for managing the release, Gyula!

-Max

On Wed, May 17, 2023 at 8:25 PM Márton Balassi  wrote:
>
> Thanks, awesome! :-)
>
> On Wed, May 17, 2023 at 2:24 PM Gyula Fóra  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink Kubernetes Operator 1.5.0.
>>
>> The Flink Kubernetes Operator allows users to manage their Apache Flink 
>> applications and their lifecycle through native k8s tooling like kubectl.
>>
>> Release highlights:
>>  - Autoscaler improvements
>>  - Operator stability, observability improvements
>>
>> Release blogpost:
>> https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/
>>
>> The release is available for download at: 
>> https://flink.apache.org/downloads.html
>>
>> Maven artifacts for Flink Kubernetes Operator can be found at: 
>> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>>
>> Official Docker image for Flink Kubernetes Operator applications can be 
>> found at: https://hub.docker.com/r/apache/flink-kubernetes-operator
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12352931
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>> Regards,
>> Gyula Fora


[jira] [Created] (FLINK-32161) Migrate and remove some legacy ExternalResource

2023-05-23 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-32161:
--

 Summary: Migrate and remove some legacy ExternalResource
 Key: FLINK-32161
 URL: https://issues.apache.org/jira/browse/FLINK-32161
 Project: Flink
  Issue Type: Technical Debt
Reporter: Weijie Guo
Assignee: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-23 Thread Aitozi
Just catch an user case report from Giannis Polyzos for this usage:

https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b

Aitozi  于2023年5月23日周二 17:45写道:
>
> Hi guys,
> I want to bring up a discussion about adding support of User
> Defined AsyncTableFunction in Flink.
> Currently, async table function are special functions for table source
> to perform
> async lookup. However, it's worth to support the user defined async
> table function.
> Because, in this way, the end SQL user can leverage it to perform the
> async operation
> which is useful to maximum the system throughput especially for IO
> bottleneck case.
>
> You can find some more detail in [1].
>
> Looking forward to feedback
>
>
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
>
> Thanks,
> Aitozi.


[jira] [Created] (FLINK-32162) Misleading log message due to missing null check

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32162:


 Summary: Misleading log message due to missing null check
 Key: FLINK-32162
 URL: https://issues.apache.org/jira/browse/FLINK-32162
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Updating the job requirements always logs "Failed to update requirements for 
job {}." because we don't check whether the error is not null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32163) Support the same application run multiple jobs in HA mode

2023-05-23 Thread melin (Jira)
melin created FLINK-32163:
-

 Summary: Support the same application run multiple jobs in HA mode
 Key: FLINK-32163
 URL: https://issues.apache.org/jira/browse/FLINK-32163
 Project: Flink
  Issue Type: New Feature
Reporter: melin


Support the same application run multiple jobs in HA mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace

2023-05-23 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-32164:
--

 Summary: LifecycleState count metrics are not reported correctly 
by namespace
 Key: FLINK-32164
 URL: https://issues.apache.org/jira/browse/FLINK-32164
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.4.0
Reporter: Gyula Fora


The per namespace lifecycle state count metrics are incorrectly show a global 
count:

https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32165) Improve observability ofd fine-grained resource management

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32165:


 Summary: Improve observability ofd fine-grained resource management
 Key: FLINK-32165
 URL: https://issues.apache.org/jira/browse/FLINK-32165
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Right now fine-grained resource management is way too much of a black-box, with 
the only source of information being the taskmanager rest endpoints.

While this is fine-ish for services built around it the developer experience is 
suffering greatly and it becomes impossible to reason about the system 
afterwards (because we don't even log anything).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32166) Show unassigned/total TM resources in web ui

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32166:


 Summary: Show unassigned/total TM resources in web ui
 Key: FLINK-32166
 URL: https://issues.apache.org/jira/browse/FLINK-32166
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


It is important to know how many resources of a TM are currently _assigned_ to 
jobs.
This is different to what resources currently _used_, since you can have 
assigned 1gb memory to a job with it only using 10mb at this time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32167) Log dynamic slot creation on task manager

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32167:


 Summary: Log dynamic slot creation on task manager
 Key: FLINK-32167
 URL: https://issues.apache.org/jira/browse/FLINK-32167
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


When a slot is dynamically allocated on the TM we should log that this happens, 
what resources it consumes and what the remaining resources are.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32168) Log required/available resources in RM

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32168:


 Summary: Log required/available resources in RM
 Key: FLINK-32168
 URL: https://issues.apache.org/jira/browse/FLINK-32168
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


When matching requirements against available resource the RM currently doesn't 
log anything apart from whether it could fulfill the resources or not.

We can make the system easier to audit by logging the current requirements, 
available resources, and how many resources are left after the matching.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32169) Show allocated slots on TM page

2023-05-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32169:


 Summary: Show allocated slots on TM page
 Key: FLINK-32169
 URL: https://issues.apache.org/jira/browse/FLINK-32169
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Web Frontend
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


Show the allocated slogs on the TM page, so that you can better understand 
which job is consuming what resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Piotr Nowojski
Hi,

Thanks for the proposal. However, are you sure that the OperatorCoordinator
is the right place to place such logic? What would happen if there are two
(or more) operator coordinators with conflicting desired checkpoint trigger
behaviour? If one source is processing a backlog and the other is already
processing real time data, I would assume that in most use cases you would
like to still have the longer checkpointing interval, not the shorter one.
Also apart from that, it might be a bit confusing and not user friendly to
have multiple places that can override the checkpointing behaviour in a
different way.

FIY in the past, we had some discussions about similar requests and back
then we chose to keep the system simpler, and exposed a more generic REST
API checkpoint triggering mechanism. I know that having to implement such
logic outside of Flink and having to call REST calls to trigger checkpoints
might not be ideal, but that's already implemented and is simple from the
perspective of Flink.

I don't know, maybe instead of adding this logic to operator coordinators,
`CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
the user could configure like a `MetricReporter`. The default one would be
just periodically triggering checkpoints. Maybe
`BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
`pendingRecords` for some source has exceeded the configured threshold and
based on that adjust the checkpointing interval accordingly? This would at
least address some of my concerns.

WDYT?

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics

wt., 9 maj 2023 o 19:11 Yunfeng Zhou 
napisał(a):

> Hi all,
>
> Dong(cc'ed) and I are opening this thread to discuss our proposal to
> support dynamically triggering checkpoints from operators, which has
> been documented in FLIP-309
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517
> >.
>
> With the help of the ability proposed in this FLIP, users could
> improve the performance of their Flink job in cases like when the job
> needs to process both historical batch data and real-time streaming
> data, by adjusting the checkpoint triggerings in different phases of a
> HybridSource or CDC source.
>
> This proposal would be a fundamental component in the effort to
> further unify Flink's batch and stream processing ability. Please feel
> free to reply to this email thread and share with us your opinions.
>
> Best regards.
>
> Dong and Yunfeng
>


Re: Questions on checkpointing mechanism for FLIP-27 Source API

2023-05-23 Thread Piotr Nowojski
Hi,

I vaguely remember someone implementing a mechanism to deal with it. I
think at least at some point (it might have changed since I looked at it),
it was solving the problem via canceling the checkpoint in the scenario
that you described. However I can not remember from the top of my head
neither the ticket number nor where is the code for that. Also I might be
completely wrong. If I don't forget, I can try to find it tomorrow.

Best,
Piotrek

śr., 17 maj 2023 o 17:39 Teoh, Hong 
napisał(a):

> Hi all,
>
> I’m writing a new source based on the FLIP-27 Source API, and I had some
> questions on the checkpointing mechanisms and associated guarantees. Would
> appreciate if someone more familiar with the API would be able to provide
> insights here!
>
> In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a
> SourceReader (running on TM). However, the SourceReader can send events to
> the SplitEnumerator. Given this, we have introduced a “loopback”
> communication mechanism from TM to JM, and I wonder if/how we handle this
> during checkpoints.
>
>
> Example of how data might be lost:
> 1. Checkpoint 123 triggered
> 2. SplitEnumerator takes checkpoint of state for checkpoint 123
> 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this
> 4. SourceReader takes checkpoint of state for checkpoint 123
> …
> 5. Checkpoint 123 completes
>
> Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once
> processed, There is now inconsistent state between SourceReader state and
> SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed,
> whereas SplitEnumerator has not processed OperatorEvent 1)
>
> Do we have any mechanisms for mitigating this issue? For example, does the
> SplitEnumerator re-take the snapshot of state for a checkpoint if an
> OperatorEvent is sent before the checkpoint is complete?
>
> Regards,
> Hong


How to figure out what's the size of ListState?

2023-05-23 Thread Amir Hossein Sharifzadeh
Dear Flink Dev team,

It's about a while since I am dealing with an issue that can't figure out
myself. I spent quite a lot of time trying to solve the problem myself, but
I feel stuck.

I explained the problem statement and the issue here:
https://stackoverflow.com/questions/76308686/how-to-figure-out-whats-the-size-of-liststate

I really appreciate any suggestion.

Best,
Amir


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Dong Lin
Hi Piotr,

Thanks for the comments. Let me try to understand your concerns and
hopefully address the concerns.

>> What would happen if there are two (or more) operator coordinators with
conflicting desired checkpoint trigger behaviour

With the proposed change, there won't exist any "*conflicting* desired
checkpoint trigger" by definition. Both job-level config and the proposed
API upperBoundCheckpointingInterval() means the upper-bound of the
checkpointing interval. If there are different upper-bounds proposed by
different source operators and the job-level config, Flink will try to
periodically trigger checkpoints at the interval corresponding to the
minimum of all these proposed upper-bounds.

>> If one source is processing a backlog and the other is already
processing real time data..

Overall, I am not sure we always want to have a longer checkpointing
interval. That really depends on the specific use-case and the job graph.

The proposed API change mechanism for operators and users to specify
different checkpoint intervals at different periods of the job. Users have
the option to use the new API to get better performance in the use-case
specified in the motivation section. I believe there can be use-case where
the proposed API is not useful, in which case users can choose not to use
the API without incurring any performance regression.

>> it might be a bit confusing and not user friendly to have multiple
places that can override the checkpointing behaviour in a different way

Admittedly, adding more APIs always incur more complexity. But sometimes we
have to incur this complexity to address new use-cases. Maybe we can see if
there are more user-friendly way to address this use-case.

>> already implemented and is simple from the perspective of Flink

Do you mean that the HybridSource operator should invoke the rest API to
trigger checkpoints? The downside of this approach is that it makes it hard
for developers of source operators (e.g. MySQL CDC, HybridSource) to
address the target use-case. AFAIK, there is no existing case where we
require operator developers to use REST API to do their job.

Can you help explain the benefit of using REST API over using the proposed
API?

Note that this approach also seems to have the same downside mentioned
above: "multiple places that can override the checkpointing behaviour". I
am not sure there can be a solution to address the target use-case without
having multiple places that can affect the checkpointing behavior.

>> check if `pendingRecords` for some source has exceeded the configured
threshold and based on that adjust the checkpointing interval accordingly

I am not sure this approach can address the target use-case in a better
way. In the target use-case, we would like to HybridSource to trigger
checkpoint more frequently when it is read the Kafka Source (than when it
is reading the HDFS source). We would need to set a flag for the checkpoint
trigger to know which source the HybridSource is reading from. But IMO the
approach is less intuitive and more complex than having the HybridSource
invoke upperBoundCheckpointingInterval() directly once it is reading Kafka
Source.

Maybe I did not understand the alternative approach rightly. I am happy to
discuss more on this topic. WDYT?


Best,
Dong

On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski 
wrote:

> Hi,
>
> Thanks for the proposal. However, are you sure that the
> OperatorCoordinator is the right place to place such logic? What would
> happen if there are two (or more) operator coordinators with conflicting
> desired checkpoint trigger behaviour? If one source is processing a backlog
> and the other is already processing real time data, I would assume that in
> most use cases you would like to still have the longer checkpointing
> interval, not the shorter one. Also apart from that, it might be a bit
> confusing and not user friendly to have multiple places that can override
> the checkpointing behaviour in a different way.
>
> FIY in the past, we had some discussions about similar requests and back
> then we chose to keep the system simpler, and exposed a more generic REST
> API checkpoint triggering mechanism. I know that having to implement such
> logic outside of Flink and having to call REST calls to trigger checkpoints
> might not be ideal, but that's already implemented and is simple from the
> perspective of Flink.
>
> I don't know, maybe instead of adding this logic to operator coordinators,
> `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that
> the user could configure like a `MetricReporter`. The default one would be
> just periodically triggering checkpoints. Maybe
> `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if
> `pendingRecords` for some source has exceeded the configured threshold and
> based on that adjust the checkpointing interval accordingly? This would at
> least address some of my concerns.
>
> WDYT?
>
> Best,
> Piotrek
>
> [1]
> https://

[jira] [Created] (FLINK-32170) Continue metric collection on intermittant job restarts

2023-05-23 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-32170:
--

 Summary: Continue metric collection on intermittant job restarts
 Key: FLINK-32170
 URL: https://issues.apache.org/jira/browse/FLINK-32170
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels


If the underlying infrastructure is not stable, e.g. Kubernetes pod eviction, 
the jobs will sometimes restart. This will restart the metric collection 
process for the autoscaler and discard any existing metrics. If the 
interruption time is short, e.g. less than one minute, we could consider 
resuming metric collection after the job goes back into RUNNING state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32171) Add PostStart hook to flink k8s operator helm

2023-05-23 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-32171:
---

 Summary: Add PostStart hook to flink k8s operator helm
 Key: FLINK-32171
 URL: https://issues.apache.org/jira/browse/FLINK-32171
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Xingcan Cui
 Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1


I feel it will be convenient to add a PostStart hook optional config to flink 
k8s operator helm (e.g. when users need to download some Flink plugins).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32172) KafkaExample can not run with args

2023-05-23 Thread xulongfeng (Jira)
xulongfeng created FLINK-32172:
--

 Summary: KafkaExample can not run with args
 Key: FLINK-32172
 URL: https://issues.apache.org/jira/browse/FLINK-32172
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.0
 Environment: * win11
 * Git
 * Maven (we recommend version 3.8.6)
 * Java 11
Reporter: xulongfeng
 Attachments: args.png, kafkaexample.png

i fork and clone flink-connector-kafka repo. after build and package, i run 
org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed,

comment say:
Example usage: --input-topic test-input --output-topic test-output 
--bootstrap.servers
* localhost:9092 --group.id myconsumer
 
but console print: Missing parameters!  from KafkaExampleUtil where need 5 
paramters but we have 4
 
thank you for your attention to this matter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically

2023-05-23 Thread Jing Ge
Hi Yunfeng, Hi Dong

Thanks for the informative discussion! It is a rational requirement to set
different checkpoint intervals for different sources in a hybridSource. The
tiny downside of this proposal, at least for me, is that I have to
understand the upper-bound definition of the interval and the built-in rule
for Flink to choose the minimum value between it and the default interval
setting. However, afaiac, the intention of this built-in rule is to
minimize changes in Flink to support the request feature which is a very
thoughtful move. Thanks for taking care of it. +1 for the Proposal.

Another very rough idea was rising in my mind while I was reading the FLIP.
I didn't do a deep dive with related source code yet, so please correct me
if I am wrong. The use case shows that two different checkpoint intervals
should be set for bounded(historical) stream and unbounded(fresh real-time)
stream sources. It is a trade-off between throughput and latency, i.e.
bounded stream with large checkpoint interval for better throughput and
unbounded stream with small checkpoint interval for lower latency (in case
of failover). As we could see that the different interval setting depends
on the boundedness of streams. Since the Source API already has its own
boundedness flag[1], is it possible to define two interval configurations
and let Flink automatically set the related one to the source based on the
known boundedness? The interval for bounded stream could be like
execution.checkpoint.interval.bounded(naming could be reconsidered), and
the other one for unbounded stream, we could use the existing one
execution.checkpoint.interval by default, or introduce a new one like
execution.checkpoint.interval.unbounded. In this way, no API change is
required.

@Piotr
Just out of curiosity, do you know any real use cases where real-time data
is processed before the backlog? Semantically, the backlog contains
historical data that has to be processed before the real-time data is
allowed to be processed. Otherwise, up-to-date data will be overwritten by
out-of-date data which turns out to be unexpected results in real business
scenarios.


Best regards,
Jing

[1]
https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41

On Tue, May 23, 2023 at 5:53 PM Dong Lin  wrote:

> Hi Piotr,
>
> Thanks for the comments. Let me try to understand your concerns and
> hopefully address the concerns.
>
> >> What would happen if there are two (or more) operator coordinators with
> conflicting desired checkpoint trigger behaviour
>
> With the proposed change, there won't exist any "*conflicting* desired
> checkpoint trigger" by definition. Both job-level config and the proposed
> API upperBoundCheckpointingInterval() means the upper-bound of the
> checkpointing interval. If there are different upper-bounds proposed by
> different source operators and the job-level config, Flink will try to
> periodically trigger checkpoints at the interval corresponding to the
> minimum of all these proposed upper-bounds.
>
> >> If one source is processing a backlog and the other is already
> processing real time data..
>
> Overall, I am not sure we always want to have a longer checkpointing
> interval. That really depends on the specific use-case and the job graph.
>
> The proposed API change mechanism for operators and users to specify
> different checkpoint intervals at different periods of the job. Users have
> the option to use the new API to get better performance in the use-case
> specified in the motivation section. I believe there can be use-case where
> the proposed API is not useful, in which case users can choose not to use
> the API without incurring any performance regression.
>
> >> it might be a bit confusing and not user friendly to have multiple
> places that can override the checkpointing behaviour in a different way
>
> Admittedly, adding more APIs always incur more complexity. But sometimes we
> have to incur this complexity to address new use-cases. Maybe we can see if
> there are more user-friendly way to address this use-case.
>
> >> already implemented and is simple from the perspective of Flink
>
> Do you mean that the HybridSource operator should invoke the rest API to
> trigger checkpoints? The downside of this approach is that it makes it hard
> for developers of source operators (e.g. MySQL CDC, HybridSource) to
> address the target use-case. AFAIK, there is no existing case where we
> require operator developers to use REST API to do their job.
>
> Can you help explain the benefit of using REST API over using the proposed
> API?
>
> Note that this approach also seems to have the same downside mentioned
> above: "multiple places that can override the checkpointing behaviour". I
> am not sure there can be a solution to address the target use-case without
> having multiple places that can affect the checkpointing behavior.
>
> >> check if `

[jira] [Created] (FLINK-32173) Flink Job Metrics returns stale values in the first request after an update in the values

2023-05-23 Thread Prabhu Joseph (Jira)
Prabhu Joseph created FLINK-32173:
-

 Summary: Flink Job Metrics returns stale values in the first 
request after an update in the values
 Key: FLINK-32173
 URL: https://issues.apache.org/jira/browse/FLINK-32173
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.17.0
Reporter: Prabhu Joseph


Flink Job Metrics returns stale values in the first request after an update in 
the values.

*Repro:*

1. Run a flink job with fixed strategy and with multiple attempts 
{code}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1


flink run -Dexecution.checkpointing.interval="10s" -d -c 
org.apache.flink.streaming.examples.wordcount.WordCount 
/usr/lib/flink/examples/streaming/WordCount.jar
{code}

2. Kill one of the TaskManager which will initiate job restart.

3. After job restarted, fetch any job metrics. The first time it returns stale 
(older) value 48.

{code}
[hadoop@ip-172-31-44-70 ~]$ curl 
http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts
  | jq .
[
  {
"id": "numRestarts",
"value": "48"
  }
]
{code}

4. On subsequent runs, it returns the correct value.
{code}
[hadoop@ip-172-31-44-70 ~]$ curl 
http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts
  | jq .
[
  {
"id": "numRestarts",
"value": "49"
  }
]
{code}

5. Repeat steps 2 to 5, which will show that the first request after an update 
to the metrics returns a previous value before the update. Only on the next 
request is the correct value returned.







--
This message was sent by Atlassian Jira
(v8.20.10#820010)