Re: [DISCUSS] FLIP-287: Extend Sink#InitContext to expose ExecutionConfig and JobID
Hi Joao, I noticed the FLIP currently contains the following 2 methods about type serializer: (1) TypeSerializer createInputSerializer(); (2) TypeSerializer createSerializer(TypeInformation inType); Is the method (2) still needed now? Best, Lijie João Boto 于2023年5月19日周五 16:53写道: > Updated the FLIP to use this option. >
[jira] [Created] (FLINK-32160) CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException
Michał Fijołek created FLINK-32160: -- Summary: CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException Key: FLINK-32160 URL: https://issues.apache.org/jira/browse/FLINK-32160 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.17.0, 1.16.0 Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3 Reporter: Michał Fijołek Hello :) We have a flink job (v 1.17) on k8s (using official flink-k8s-operator) that reads data from kafka and writes it to s3 using flink-sql using compaction. Job sometimes fails and continues from checkpoint just fine, but once a couple of days we experience a crash loop. Job cannot continue from the latest checkpoint and fails with such exception: {noformat} java.util.NoSuchElementException at java.base/java.util.ArrayList$Itr.next(Unknown Source) at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source){noformat} Here’s the relevant code: [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114] It looks like `CompactOperator` is calling `next()` on iterator without checking `hasNext()` first - why's that? Is it a bug? Why `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty iterator? Is latest checkpoint broken in such case? We have an identical job, but without compaction, and it works smoothly for a couple of weeks now. The whole job is just `select` from kafka and `insert` to s3. {noformat} CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING, `foo_bar4` STRING ) PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING) STORED AS parquet LOCATION 's3a://my/bucket/' TBLPROPERTIES ( 'auto-compaction' = 'true', 'compaction.file-size' = '128MB', 'sink.parallelism' = '8', 'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.rollover-interval' = '1 h', 'sink.partition-commit.policy.kind' = 'metastore' ){noformat} Checkpoint configuration: {noformat} Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend HashMapStateBackend Interval 20m 0s Timeout 10m 0s Minimum Pause Between Checkpoints 0ms Maximum Concurrent Checkpoints 1 Unaligned Checkpoints Disabled Persist Checkpoints Externally Enabled (retain on cancellation) Tolerable Failed Checkpoints 0 Checkpoints With Finished Tasks Enabled State Changelog Disabled{noformat} Is there something wrong with given config or is this some unhandled edge case? Currently our workaround is to restart a job, without using checkpoint - it uses a state from kafka which in this case is fine -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] Release 1.16.2, release candidate #1
+1 (binding) - Verified checksums and signatures - Verified that no binary exist in the source archive - Build the source with Maven - Verified that README doesn't have unexpected contents - Verified that the commit ID on Flink UI matches with the tag - Run a job with SQL client that reads from a Kafka topic then writes to another one. Created a savepoint for the job and recovered from savepoint successfully - Reviewed release announcement PR Best regards, Qingsheng On Mon, May 22, 2023 at 8:34 PM Xingbo Huang wrote: > +1 (binding) > > - verify signatures and checksums > - verify python wheel package contents > - pip install apache-flink-libraries and apache-flink wheel packages > - run example flink/flink-python/pyflink/examples/table/basic_operations.py > with Python 3.7 > - reviewed release blog post > > Best, > Xingbo > > Xintong Song 于2023年5月21日周日 13:03写道: > > > +1 (binding) > > > > - verified signatures and checksums > > - built from source > > - tried example jobs with a standalone cluster, everything seems fine > > - review release announcement PR > > > > Best, > > > > Xintong > > > > > > > > On Sat, May 20, 2023 at 6:04 PM Jing Ge > > wrote: > > > > > +1(non-binding) > > > > > > - reviewed Jira release notes > > > - built from source > > > - apache repos contain all necessary files > > > - verified signatures > > > - verified hashes > > > - verified tag > > > - reviewed PR > > > > > > Best regards, > > > Jing > > > > > > On Sat, May 20, 2023 at 11:51 AM Yun Tang wrote: > > > > > > > +1 (non-binding) > > > > > > > > > > > > * Verified signatures > > > > * Reviewed the flink-web PR > > > > * Set up a standalone cluster from released binaries and check > the > > > git > > > > revision number. > > > > * Submit the statemachine example with RocksDB, and it works > fine. > > > > > > > > Best, > > > > Yun Tang > > > > > > > > From: Yuxin Tan > > > > Sent: Friday, May 19, 2023 17:41 > > > > To: dev@flink.apache.org > > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1 > > > > > > > > +1 (non-binding) > > > > > > > > - Verified signature > > > > - Verified hashes > > > > - Build form source with mac > > > > - Verify that the source archives do not contain any binaries > > > > - Run streaming and batch job in sql-client successfully. > > > > > > > > Thanks weijie for driving this release candidate. > > > > > > > > Best, > > > > Yuxin > > > > > > > > > > > > weijie guo 于2023年5月19日周五 16:19写道: > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > Please review and vote on the release candidate #1 for the version > > > > 1.16.2, > > > > > > > > > > as follows: > > > > > > > > > > > > > > > [ ] +1, Approve the release > > > > > > > > > > [ ] -1, Do not approve the release (please provide specific > comments) > > > > > > > > > > > > > > > The complete staging area is available for your review, which > > includes: > > > > > > > > > > * JIRA release notes [1], > > > > > > > > > > * the official Apache source release and binary convenience > releases > > to > > > > be > > > > > > > > > > deployed to dist.apache.org [2], which are signed with the key > with > > > > > > > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3], > > > > > > > > > > * all artifacts to be deployed to the Maven Central Repository [4], > > > > > > > > > > * source code tag "release-1.16.2-rc1" [5], > > > > > > > > > > * website pull request listing the new release and adding > > announcement > > > > blog > > > > > > > > > > post [6]. > > > > > > > > > > > > > > > The vote will be open for at least 72 hours. It is adopted by > > majority > > > > > > > > > > approval, with at least 3 PMC affirmative votes. > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Release Manager > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765 > > > > > > > > > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.16.2-rc1/ > > > > > > > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > > > > > > > [4] > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1634/ > > > > > > > > > > [5] > https://github.com/apache/flink/releases/tag/release-1.16.2-rc1 > > > > > > > > > > [6] https://github.com/apache/flink-web/pull/649 > > > > > > > > > > > > > > >
Re: [VOTE] Release 1.16.2, release candidate #1
+1 (non-binding) - Verified signature - Verified hashes - Build form source with mac - Verify that the source archives do not contain any binaries - Run streaming and batch job in sql-client successfully. Thank you for driving this release candidate. On Tue, 23 May 2023 at 1:51 PM, Qingsheng Ren wrote: > +1 (binding) > > - Verified checksums and signatures > - Verified that no binary exist in the source archive > - Build the source with Maven > - Verified that README doesn't have unexpected contents > - Verified that the commit ID on Flink UI matches with the tag > - Run a job with SQL client that reads from a Kafka topic then writes to > another one. Created a savepoint for the job and recovered from savepoint > successfully > - Reviewed release announcement PR > > Best regards, > Qingsheng > > On Mon, May 22, 2023 at 8:34 PM Xingbo Huang wrote: > > > +1 (binding) > > > > - verify signatures and checksums > > - verify python wheel package contents > > - pip install apache-flink-libraries and apache-flink wheel packages > > - run example > flink/flink-python/pyflink/examples/table/basic_operations.py > > with Python 3.7 > > - reviewed release blog post > > > > Best, > > Xingbo > > > > Xintong Song 于2023年5月21日周日 13:03写道: > > > > > +1 (binding) > > > > > > - verified signatures and checksums > > > - built from source > > > - tried example jobs with a standalone cluster, everything seems fine > > > - review release announcement PR > > > > > > Best, > > > > > > Xintong > > > > > > > > > > > > On Sat, May 20, 2023 at 6:04 PM Jing Ge > > > wrote: > > > > > > > +1(non-binding) > > > > > > > > - reviewed Jira release notes > > > > - built from source > > > > - apache repos contain all necessary files > > > > - verified signatures > > > > - verified hashes > > > > - verified tag > > > > - reviewed PR > > > > > > > > Best regards, > > > > Jing > > > > > > > > On Sat, May 20, 2023 at 11:51 AM Yun Tang wrote: > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > * Verified signatures > > > > > * Reviewed the flink-web PR > > > > > * Set up a standalone cluster from released binaries and check > > the > > > > git > > > > > revision number. > > > > > * Submit the statemachine example with RocksDB, and it works > > fine. > > > > > > > > > > Best, > > > > > Yun Tang > > > > > > > > > > From: Yuxin Tan > > > > > Sent: Friday, May 19, 2023 17:41 > > > > > To: dev@flink.apache.org > > > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1 > > > > > > > > > > +1 (non-binding) > > > > > > > > > > - Verified signature > > > > > - Verified hashes > > > > > - Build form source with mac > > > > > - Verify that the source archives do not contain any binaries > > > > > - Run streaming and batch job in sql-client successfully. > > > > > > > > > > Thanks weijie for driving this release candidate. > > > > > > > > > > Best, > > > > > Yuxin > > > > > > > > > > > > > > > weijie guo 于2023年5月19日周五 16:19写道: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > Please review and vote on the release candidate #1 for the > version > > > > > 1.16.2, > > > > > > > > > > > > as follows: > > > > > > > > > > > > > > > > > > [ ] +1, Approve the release > > > > > > > > > > > > [ ] -1, Do not approve the release (please provide specific > > comments) > > > > > > > > > > > > > > > > > > The complete staging area is available for your review, which > > > includes: > > > > > > > > > > > > * JIRA release notes [1], > > > > > > > > > > > > * the official Apache source release and binary convenience > > releases > > > to > > > > > be > > > > > > > > > > > > deployed to dist.apache.org [2], which are signed with the key > > with > > > > > > > > > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3], > > > > > > > > > > > > * all artifacts to be deployed to the Maven Central Repository > [4], > > > > > > > > > > > > * source code tag "release-1.16.2-rc1" [5], > > > > > > > > > > > > * website pull request listing the new release and adding > > > announcement > > > > > blog > > > > > > > > > > > > post [6]. > > > > > > > > > > > > > > > > > > The vote will be open for at least 72 hours. It is adopted by > > > majority > > > > > > > > > > > > approval, with at least 3 PMC affirmative votes. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Release Manager > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765 > > > > > > > > > > > > [2] > https://dist.apache.org/repos/dist/dev/flink/flink-1.16.2-rc1/ > > > > > > > > > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > > > > > > > > > [4] > > > > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1634/ > > > > > > > > > > > > [5] > > https://github.com/apache/flink/releases/tag/release-1.16.2-rc1 > >
Re: [VOTE] Release 1.16.2, release candidate #1
+1(non-binding) - reviewed Jira release notes - built from source - verified signatures - verified hashes - verified that no binary exist in the source archive - run streaming job in sql-client successfully Best, Hang Samrat Deb 于2023年5月23日周二 16:23写道: > +1 (non-binding) > > - Verified signature > - Verified hashes > - Build form source with mac > - Verify that the source archives do not contain any binaries > - Run streaming and batch job in sql-client successfully. > > Thank you for driving this release candidate. > > On Tue, 23 May 2023 at 1:51 PM, Qingsheng Ren wrote: > > > +1 (binding) > > > > - Verified checksums and signatures > > - Verified that no binary exist in the source archive > > - Build the source with Maven > > - Verified that README doesn't have unexpected contents > > - Verified that the commit ID on Flink UI matches with the tag > > - Run a job with SQL client that reads from a Kafka topic then writes to > > another one. Created a savepoint for the job and recovered from savepoint > > successfully > > - Reviewed release announcement PR > > > > Best regards, > > Qingsheng > > > > On Mon, May 22, 2023 at 8:34 PM Xingbo Huang wrote: > > > > > +1 (binding) > > > > > > - verify signatures and checksums > > > - verify python wheel package contents > > > - pip install apache-flink-libraries and apache-flink wheel packages > > > - run example > > flink/flink-python/pyflink/examples/table/basic_operations.py > > > with Python 3.7 > > > - reviewed release blog post > > > > > > Best, > > > Xingbo > > > > > > Xintong Song 于2023年5月21日周日 13:03写道: > > > > > > > +1 (binding) > > > > > > > > - verified signatures and checksums > > > > - built from source > > > > - tried example jobs with a standalone cluster, everything seems fine > > > > - review release announcement PR > > > > > > > > Best, > > > > > > > > Xintong > > > > > > > > > > > > > > > > On Sat, May 20, 2023 at 6:04 PM Jing Ge > > > > wrote: > > > > > > > > > +1(non-binding) > > > > > > > > > > - reviewed Jira release notes > > > > > - built from source > > > > > - apache repos contain all necessary files > > > > > - verified signatures > > > > > - verified hashes > > > > > - verified tag > > > > > - reviewed PR > > > > > > > > > > Best regards, > > > > > Jing > > > > > > > > > > On Sat, May 20, 2023 at 11:51 AM Yun Tang > wrote: > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > * Verified signatures > > > > > > * Reviewed the flink-web PR > > > > > > * Set up a standalone cluster from released binaries and > check > > > the > > > > > git > > > > > > revision number. > > > > > > * Submit the statemachine example with RocksDB, and it works > > > fine. > > > > > > > > > > > > Best, > > > > > > Yun Tang > > > > > > > > > > > > From: Yuxin Tan > > > > > > Sent: Friday, May 19, 2023 17:41 > > > > > > To: dev@flink.apache.org > > > > > > Subject: Re: [VOTE] Release 1.16.2, release candidate #1 > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > - Verified signature > > > > > > - Verified hashes > > > > > > - Build form source with mac > > > > > > - Verify that the source archives do not contain any binaries > > > > > > - Run streaming and batch job in sql-client successfully. > > > > > > > > > > > > Thanks weijie for driving this release candidate. > > > > > > > > > > > > Best, > > > > > > Yuxin > > > > > > > > > > > > > > > > > > weijie guo 于2023年5月19日周五 16:19写道: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > Please review and vote on the release candidate #1 for the > > version > > > > > > 1.16.2, > > > > > > > > > > > > > > as follows: > > > > > > > > > > > > > > > > > > > > > [ ] +1, Approve the release > > > > > > > > > > > > > > [ ] -1, Do not approve the release (please provide specific > > > comments) > > > > > > > > > > > > > > > > > > > > > The complete staging area is available for your review, which > > > > includes: > > > > > > > > > > > > > > * JIRA release notes [1], > > > > > > > > > > > > > > * the official Apache source release and binary convenience > > > releases > > > > to > > > > > > be > > > > > > > > > > > > > > deployed to dist.apache.org [2], which are signed with the key > > > with > > > > > > > > > > > > > > fingerprint 8D56AE6E7082699A4870750EA4E8C4C05EE6861F [3], > > > > > > > > > > > > > > * all artifacts to be deployed to the Maven Central Repository > > [4], > > > > > > > > > > > > > > * source code tag "release-1.16.2-rc1" [5], > > > > > > > > > > > > > > * website pull request listing the new release and adding > > > > announcement > > > > > > blog > > > > > > > > > > > > > > post [6]. > > > > > > > > > > > > > > > > > > > > > The vote will be open for at least 72 hours. It is adopted by > > > > majority > > > > > > > > > > > > > > approval, with at least 3 PMC affirmative votes. > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Release Manager > > > > > >
[DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction
Hi guys, I want to bring up a discussion about adding support of User Defined AsyncTableFunction in Flink. Currently, async table function are special functions for table source to perform async lookup. However, it's worth to support the user defined async table function. Because, in this way, the end SQL user can leverage it to perform the async operation which is useful to maximum the system throughput especially for IO bottleneck case. You can find some more detail in [1]. Looking forward to feedback [1]: https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction Thanks, Aitozi.
Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released
Niceee. Thanks for managing the release, Gyula! -Max On Wed, May 17, 2023 at 8:25 PM Márton Balassi wrote: > > Thanks, awesome! :-) > > On Wed, May 17, 2023 at 2:24 PM Gyula Fóra wrote: >> >> The Apache Flink community is very happy to announce the release of Apache >> Flink Kubernetes Operator 1.5.0. >> >> The Flink Kubernetes Operator allows users to manage their Apache Flink >> applications and their lifecycle through native k8s tooling like kubectl. >> >> Release highlights: >> - Autoscaler improvements >> - Operator stability, observability improvements >> >> Release blogpost: >> https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/ >> >> The release is available for download at: >> https://flink.apache.org/downloads.html >> >> Maven artifacts for Flink Kubernetes Operator can be found at: >> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator >> >> Official Docker image for Flink Kubernetes Operator applications can be >> found at: https://hub.docker.com/r/apache/flink-kubernetes-operator >> >> The full release notes are available in Jira: >> https://issues.apache.org/jira/projects/FLINK/versions/12352931 >> >> We would like to thank all contributors of the Apache Flink community who >> made this release possible! >> >> Regards, >> Gyula Fora
[jira] [Created] (FLINK-32161) Migrate and remove some legacy ExternalResource
Weijie Guo created FLINK-32161: -- Summary: Migrate and remove some legacy ExternalResource Key: FLINK-32161 URL: https://issues.apache.org/jira/browse/FLINK-32161 Project: Flink Issue Type: Technical Debt Reporter: Weijie Guo Assignee: Weijie Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction
Just catch an user case report from Giannis Polyzos for this usage: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b Aitozi 于2023年5月23日周二 17:45写道: > > Hi guys, > I want to bring up a discussion about adding support of User > Defined AsyncTableFunction in Flink. > Currently, async table function are special functions for table source > to perform > async lookup. However, it's worth to support the user defined async > table function. > Because, in this way, the end SQL user can leverage it to perform the > async operation > which is useful to maximum the system throughput especially for IO > bottleneck case. > > You can find some more detail in [1]. > > Looking forward to feedback > > > [1]: > https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction > > Thanks, > Aitozi.
[jira] [Created] (FLINK-32162) Misleading log message due to missing null check
Chesnay Schepler created FLINK-32162: Summary: Misleading log message due to missing null check Key: FLINK-32162 URL: https://issues.apache.org/jira/browse/FLINK-32162 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Updating the job requirements always logs "Failed to update requirements for job {}." because we don't check whether the error is not null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32163) Support the same application run multiple jobs in HA mode
melin created FLINK-32163: - Summary: Support the same application run multiple jobs in HA mode Key: FLINK-32163 URL: https://issues.apache.org/jira/browse/FLINK-32163 Project: Flink Issue Type: New Feature Reporter: melin Support the same application run multiple jobs in HA mode -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32164) LifecycleState count metrics are not reported correctly by namespace
Gyula Fora created FLINK-32164: -- Summary: LifecycleState count metrics are not reported correctly by namespace Key: FLINK-32164 URL: https://issues.apache.org/jira/browse/FLINK-32164 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0, kubernetes-operator-1.4.0 Reporter: Gyula Fora The per namespace lifecycle state count metrics are incorrectly show a global count: https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/LifecycleMetrics.java#L145 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32165) Improve observability ofd fine-grained resource management
Chesnay Schepler created FLINK-32165: Summary: Improve observability ofd fine-grained resource management Key: FLINK-32165 URL: https://issues.apache.org/jira/browse/FLINK-32165 Project: Flink Issue Type: Improvement Components: Runtime / Coordination, Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Right now fine-grained resource management is way too much of a black-box, with the only source of information being the taskmanager rest endpoints. While this is fine-ish for services built around it the developer experience is suffering greatly and it becomes impossible to reason about the system afterwards (because we don't even log anything). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32166) Show unassigned/total TM resources in web ui
Chesnay Schepler created FLINK-32166: Summary: Show unassigned/total TM resources in web ui Key: FLINK-32166 URL: https://issues.apache.org/jira/browse/FLINK-32166 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 It is important to know how many resources of a TM are currently _assigned_ to jobs. This is different to what resources currently _used_, since you can have assigned 1gb memory to a job with it only using 10mb at this time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32167) Log dynamic slot creation on task manager
Chesnay Schepler created FLINK-32167: Summary: Log dynamic slot creation on task manager Key: FLINK-32167 URL: https://issues.apache.org/jira/browse/FLINK-32167 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 When a slot is dynamically allocated on the TM we should log that this happens, what resources it consumes and what the remaining resources are. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32168) Log required/available resources in RM
Chesnay Schepler created FLINK-32168: Summary: Log required/available resources in RM Key: FLINK-32168 URL: https://issues.apache.org/jira/browse/FLINK-32168 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 When matching requirements against available resource the RM currently doesn't log anything apart from whether it could fulfill the resources or not. We can make the system easier to audit by logging the current requirements, available resources, and how many resources are left after the matching. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32169) Show allocated slots on TM page
Chesnay Schepler created FLINK-32169: Summary: Show allocated slots on TM page Key: FLINK-32169 URL: https://issues.apache.org/jira/browse/FLINK-32169 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Web Frontend Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0 Show the allocated slogs on the TM page, so that you can better understand which job is consuming what resources. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically
Hi, Thanks for the proposal. However, are you sure that the OperatorCoordinator is the right place to place such logic? What would happen if there are two (or more) operator coordinators with conflicting desired checkpoint trigger behaviour? If one source is processing a backlog and the other is already processing real time data, I would assume that in most use cases you would like to still have the longer checkpointing interval, not the shorter one. Also apart from that, it might be a bit confusing and not user friendly to have multiple places that can override the checkpointing behaviour in a different way. FIY in the past, we had some discussions about similar requests and back then we chose to keep the system simpler, and exposed a more generic REST API checkpoint triggering mechanism. I know that having to implement such logic outside of Flink and having to call REST calls to trigger checkpoints might not be ideal, but that's already implemented and is simple from the perspective of Flink. I don't know, maybe instead of adding this logic to operator coordinators, `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that the user could configure like a `MetricReporter`. The default one would be just periodically triggering checkpoints. Maybe `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if `pendingRecords` for some source has exceeded the configured threshold and based on that adjust the checkpointing interval accordingly? This would at least address some of my concerns. WDYT? Best, Piotrek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics wt., 9 maj 2023 o 19:11 Yunfeng Zhou napisał(a): > Hi all, > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > support dynamically triggering checkpoints from operators, which has > been documented in FLIP-309 > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255069517 > >. > > With the help of the ability proposed in this FLIP, users could > improve the performance of their Flink job in cases like when the job > needs to process both historical batch data and real-time streaming > data, by adjusting the checkpoint triggerings in different phases of a > HybridSource or CDC source. > > This proposal would be a fundamental component in the effort to > further unify Flink's batch and stream processing ability. Please feel > free to reply to this email thread and share with us your opinions. > > Best regards. > > Dong and Yunfeng >
Re: Questions on checkpointing mechanism for FLIP-27 Source API
Hi, I vaguely remember someone implementing a mechanism to deal with it. I think at least at some point (it might have changed since I looked at it), it was solving the problem via canceling the checkpoint in the scenario that you described. However I can not remember from the top of my head neither the ticket number nor where is the code for that. Also I might be completely wrong. If I don't forget, I can try to find it tomorrow. Best, Piotrek śr., 17 maj 2023 o 17:39 Teoh, Hong napisał(a): > Hi all, > > I’m writing a new source based on the FLIP-27 Source API, and I had some > questions on the checkpointing mechanisms and associated guarantees. Would > appreciate if someone more familiar with the API would be able to provide > insights here! > > In FLIP-27 Source, we now have a SplitEnumerator (running on JM) and a > SourceReader (running on TM). However, the SourceReader can send events to > the SplitEnumerator. Given this, we have introduced a “loopback” > communication mechanism from TM to JM, and I wonder if/how we handle this > during checkpoints. > > > Example of how data might be lost: > 1. Checkpoint 123 triggered > 2. SplitEnumerator takes checkpoint of state for checkpoint 123 > 3. SourceReader sends OperatorEvent 1 and mutates state to reflect this > 4. SourceReader takes checkpoint of state for checkpoint 123 > … > 5. Checkpoint 123 completes > > Let’s assume OperatorEvent 1 would mutate SplitEnumerator state once > processed, There is now inconsistent state between SourceReader state and > SplitEnumerator state. (SourceReader assumes OperatorEvent 1 is processed, > whereas SplitEnumerator has not processed OperatorEvent 1) > > Do we have any mechanisms for mitigating this issue? For example, does the > SplitEnumerator re-take the snapshot of state for a checkpoint if an > OperatorEvent is sent before the checkpoint is complete? > > Regards, > Hong
How to figure out what's the size of ListState?
Dear Flink Dev team, It's about a while since I am dealing with an issue that can't figure out myself. I spent quite a lot of time trying to solve the problem myself, but I feel stuck. I explained the problem statement and the issue here: https://stackoverflow.com/questions/76308686/how-to-figure-out-whats-the-size-of-liststate I really appreciate any suggestion. Best, Amir
Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically
Hi Piotr, Thanks for the comments. Let me try to understand your concerns and hopefully address the concerns. >> What would happen if there are two (or more) operator coordinators with conflicting desired checkpoint trigger behaviour With the proposed change, there won't exist any "*conflicting* desired checkpoint trigger" by definition. Both job-level config and the proposed API upperBoundCheckpointingInterval() means the upper-bound of the checkpointing interval. If there are different upper-bounds proposed by different source operators and the job-level config, Flink will try to periodically trigger checkpoints at the interval corresponding to the minimum of all these proposed upper-bounds. >> If one source is processing a backlog and the other is already processing real time data.. Overall, I am not sure we always want to have a longer checkpointing interval. That really depends on the specific use-case and the job graph. The proposed API change mechanism for operators and users to specify different checkpoint intervals at different periods of the job. Users have the option to use the new API to get better performance in the use-case specified in the motivation section. I believe there can be use-case where the proposed API is not useful, in which case users can choose not to use the API without incurring any performance regression. >> it might be a bit confusing and not user friendly to have multiple places that can override the checkpointing behaviour in a different way Admittedly, adding more APIs always incur more complexity. But sometimes we have to incur this complexity to address new use-cases. Maybe we can see if there are more user-friendly way to address this use-case. >> already implemented and is simple from the perspective of Flink Do you mean that the HybridSource operator should invoke the rest API to trigger checkpoints? The downside of this approach is that it makes it hard for developers of source operators (e.g. MySQL CDC, HybridSource) to address the target use-case. AFAIK, there is no existing case where we require operator developers to use REST API to do their job. Can you help explain the benefit of using REST API over using the proposed API? Note that this approach also seems to have the same downside mentioned above: "multiple places that can override the checkpointing behaviour". I am not sure there can be a solution to address the target use-case without having multiple places that can affect the checkpointing behavior. >> check if `pendingRecords` for some source has exceeded the configured threshold and based on that adjust the checkpointing interval accordingly I am not sure this approach can address the target use-case in a better way. In the target use-case, we would like to HybridSource to trigger checkpoint more frequently when it is read the Kafka Source (than when it is reading the HDFS source). We would need to set a flag for the checkpoint trigger to know which source the HybridSource is reading from. But IMO the approach is less intuitive and more complex than having the HybridSource invoke upperBoundCheckpointingInterval() directly once it is reading Kafka Source. Maybe I did not understand the alternative approach rightly. I am happy to discuss more on this topic. WDYT? Best, Dong On Tue, May 23, 2023 at 10:27 PM Piotr Nowojski wrote: > Hi, > > Thanks for the proposal. However, are you sure that the > OperatorCoordinator is the right place to place such logic? What would > happen if there are two (or more) operator coordinators with conflicting > desired checkpoint trigger behaviour? If one source is processing a backlog > and the other is already processing real time data, I would assume that in > most use cases you would like to still have the longer checkpointing > interval, not the shorter one. Also apart from that, it might be a bit > confusing and not user friendly to have multiple places that can override > the checkpointing behaviour in a different way. > > FIY in the past, we had some discussions about similar requests and back > then we chose to keep the system simpler, and exposed a more generic REST > API checkpoint triggering mechanism. I know that having to implement such > logic outside of Flink and having to call REST calls to trigger checkpoints > might not be ideal, but that's already implemented and is simple from the > perspective of Flink. > > I don't know, maybe instead of adding this logic to operator coordinators, > `CheckpointCoordinator` should have a pluggable `CheckpointTrigger`, that > the user could configure like a `MetricReporter`. The default one would be > just periodically triggering checkpoints. Maybe > `BacklogDynamicCheckpointTrigger` could look at metrics[1], check if > `pendingRecords` for some source has exceeded the configured threshold and > based on that adjust the checkpointing interval accordingly? This would at > least address some of my concerns. > > WDYT? > > Best, > Piotrek > > [1] > https://
[jira] [Created] (FLINK-32170) Continue metric collection on intermittant job restarts
Maximilian Michels created FLINK-32170: -- Summary: Continue metric collection on intermittant job restarts Key: FLINK-32170 URL: https://issues.apache.org/jira/browse/FLINK-32170 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels If the underlying infrastructure is not stable, e.g. Kubernetes pod eviction, the jobs will sometimes restart. This will restart the metric collection process for the autoscaler and discard any existing metrics. If the interruption time is short, e.g. less than one minute, we could consider resuming metric collection after the job goes back into RUNNING state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32171) Add PostStart hook to flink k8s operator helm
Xingcan Cui created FLINK-32171: --- Summary: Add PostStart hook to flink k8s operator helm Key: FLINK-32171 URL: https://issues.apache.org/jira/browse/FLINK-32171 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Xingcan Cui Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1 I feel it will be convenient to add a PostStart hook optional config to flink k8s operator helm (e.g. when users need to download some Flink plugins). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32172) KafkaExample can not run with args
xulongfeng created FLINK-32172: -- Summary: KafkaExample can not run with args Key: FLINK-32172 URL: https://issues.apache.org/jira/browse/FLINK-32172 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.0 Environment: * win11 * Git * Maven (we recommend version 3.8.6) * Java 11 Reporter: xulongfeng Attachments: args.png, kafkaexample.png i fork and clone flink-connector-kafka repo. after build and package, i run org/apache/flink/streaming/kafka/test/KafkaExample.java main() but failed, comment say: Example usage: --input-topic test-input --output-topic test-output --bootstrap.servers * localhost:9092 --group.id myconsumer but console print: Missing parameters! from KafkaExampleUtil where need 5 paramters but we have 4 thank you for your attention to this matter -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-309: Enable operators to trigger checkpoints dynamically
Hi Yunfeng, Hi Dong Thanks for the informative discussion! It is a rational requirement to set different checkpoint intervals for different sources in a hybridSource. The tiny downside of this proposal, at least for me, is that I have to understand the upper-bound definition of the interval and the built-in rule for Flink to choose the minimum value between it and the default interval setting. However, afaiac, the intention of this built-in rule is to minimize changes in Flink to support the request feature which is a very thoughtful move. Thanks for taking care of it. +1 for the Proposal. Another very rough idea was rising in my mind while I was reading the FLIP. I didn't do a deep dive with related source code yet, so please correct me if I am wrong. The use case shows that two different checkpoint intervals should be set for bounded(historical) stream and unbounded(fresh real-time) stream sources. It is a trade-off between throughput and latency, i.e. bounded stream with large checkpoint interval for better throughput and unbounded stream with small checkpoint interval for lower latency (in case of failover). As we could see that the different interval setting depends on the boundedness of streams. Since the Source API already has its own boundedness flag[1], is it possible to define two interval configurations and let Flink automatically set the related one to the source based on the known boundedness? The interval for bounded stream could be like execution.checkpoint.interval.bounded(naming could be reconsidered), and the other one for unbounded stream, we could use the existing one execution.checkpoint.interval by default, or introduce a new one like execution.checkpoint.interval.unbounded. In this way, no API change is required. @Piotr Just out of curiosity, do you know any real use cases where real-time data is processed before the backlog? Semantically, the backlog contains historical data that has to be processed before the real-time data is allowed to be processed. Otherwise, up-to-date data will be overwritten by out-of-date data which turns out to be unexpected results in real business scenarios. Best regards, Jing [1] https://github.com/apache/flink/blob/fadde2a378aac4293676944dd513291919a481e3/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L41 On Tue, May 23, 2023 at 5:53 PM Dong Lin wrote: > Hi Piotr, > > Thanks for the comments. Let me try to understand your concerns and > hopefully address the concerns. > > >> What would happen if there are two (or more) operator coordinators with > conflicting desired checkpoint trigger behaviour > > With the proposed change, there won't exist any "*conflicting* desired > checkpoint trigger" by definition. Both job-level config and the proposed > API upperBoundCheckpointingInterval() means the upper-bound of the > checkpointing interval. If there are different upper-bounds proposed by > different source operators and the job-level config, Flink will try to > periodically trigger checkpoints at the interval corresponding to the > minimum of all these proposed upper-bounds. > > >> If one source is processing a backlog and the other is already > processing real time data.. > > Overall, I am not sure we always want to have a longer checkpointing > interval. That really depends on the specific use-case and the job graph. > > The proposed API change mechanism for operators and users to specify > different checkpoint intervals at different periods of the job. Users have > the option to use the new API to get better performance in the use-case > specified in the motivation section. I believe there can be use-case where > the proposed API is not useful, in which case users can choose not to use > the API without incurring any performance regression. > > >> it might be a bit confusing and not user friendly to have multiple > places that can override the checkpointing behaviour in a different way > > Admittedly, adding more APIs always incur more complexity. But sometimes we > have to incur this complexity to address new use-cases. Maybe we can see if > there are more user-friendly way to address this use-case. > > >> already implemented and is simple from the perspective of Flink > > Do you mean that the HybridSource operator should invoke the rest API to > trigger checkpoints? The downside of this approach is that it makes it hard > for developers of source operators (e.g. MySQL CDC, HybridSource) to > address the target use-case. AFAIK, there is no existing case where we > require operator developers to use REST API to do their job. > > Can you help explain the benefit of using REST API over using the proposed > API? > > Note that this approach also seems to have the same downside mentioned > above: "multiple places that can override the checkpointing behaviour". I > am not sure there can be a solution to address the target use-case without > having multiple places that can affect the checkpointing behavior. > > >> check if `
[jira] [Created] (FLINK-32173) Flink Job Metrics returns stale values in the first request after an update in the values
Prabhu Joseph created FLINK-32173: - Summary: Flink Job Metrics returns stale values in the first request after an update in the values Key: FLINK-32173 URL: https://issues.apache.org/jira/browse/FLINK-32173 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.17.0 Reporter: Prabhu Joseph Flink Job Metrics returns stale values in the first request after an update in the values. *Repro:* 1. Run a flink job with fixed strategy and with multiple attempts {code} restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 1 flink run -Dexecution.checkpointing.interval="10s" -d -c org.apache.flink.streaming.examples.wordcount.WordCount /usr/lib/flink/examples/streaming/WordCount.jar {code} 2. Kill one of the TaskManager which will initiate job restart. 3. After job restarted, fetch any job metrics. The first time it returns stale (older) value 48. {code} [hadoop@ip-172-31-44-70 ~]$ curl http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts | jq . [ { "id": "numRestarts", "value": "48" } ] {code} 4. On subsequent runs, it returns the correct value. {code} [hadoop@ip-172-31-44-70 ~]$ curl http://jobmanager:52000/jobs/d24f7d74d541f1215a65395e0ebd898c/metrics?get=numRestarts | jq . [ { "id": "numRestarts", "value": "49" } ] {code} 5. Repeat steps 2 to 5, which will show that the first request after an update to the metrics returns a previous value before the update. Only on the next request is the correct value returned. -- This message was sent by Atlassian Jira (v8.20.10#820010)