[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542617#comment-16542617 ] ASF GitHub Bot commented on FLINK-7095: --- Github user zhangminglei closed the pull request at: https://github.com/apache/flink/pull/5375 > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6, pull-request-available > Fix For: 1.6.0 > > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542615#comment-16542615 ] ASF GitHub Bot commented on FLINK-7095: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5375 Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink jira will be addressed by me in the future. > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6, pull-request-available > Fix For: 1.6.0 > > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5375: [FLINK-7095] [TaskManager] Add Command line parsin...
Github user zhangminglei closed the pull request at: https://github.com/apache/flink/pull/5375 ---
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/6323 [FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces ## What is the purpose of the change This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories. ## Brief change log - Introduction of `org.apache.table.factories.TableFactory` a common interface for factories - Introduction of `org.apache.table.factories.TableFormatFactory` a specific table factory for formats - Specific factories for `StreamTableSource`, `StreamTableSink`, `BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and `SerializationSchema` - Deprecation of old format-specific table sources (sinks will be deprecated in a follow-up PR) - Possibility to register table source and sink under a common name (table type `both` in SQL Client YAML) ## Verifying this change - Existing tests verify the implementation - Additional ITCases and unit tests have been added - (An end-to-end test will follow in a separate PR) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6323 commit 980499f887d72ddf9a405c4ad200d0cab15d889c Author: Timo Walther Date: 2018-06-27T11:16:49Z [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility. This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives. commit 42a8a156d4e6f8f3d119c458350b6c897306fc48 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks This closes #6201. commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9 Author: Timo Walther Date: 2018-07-11T11:29:03Z Rename to TableFactory and move it to factories package commit 1c581cba61ba321bb6de6a4d298a881840d11cfe Author: Timo Walther Date: 2018-07-11T11:46:31Z Refactor format factories commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295 Author: Timo Walther Date: 2018-07-12T06:35:00Z Unify table factories commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916 Author: Timo Walther Date: 2018-07-12T07:05:50Z Move table type out of descriptors commit 6b83f2e1c0e63147f049dc5389c5633077b789a4 Author: Timo Walther Date: 2018-07-12T08:50:09Z Make source/sink factories environment-dependent commit 4f1255fd003080f078afe6ef67ffa58f40ffec36 Author: Timo Walther Date: 2018-07-12T18:48:45Z Clean up and simplify changes ---
[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/5375 Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink jira will be addressed by me in the future. ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542614#comment-16542614 ] ASF GitHub Bot commented on FLINK-8558: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/6323 [FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format interfaces ## What is the purpose of the change This PR finalizes the efforts done in #6264 and #6201 for having unified interfaces for table sources, table sinks, and table formats. It reduces code duplication and cleans up the code base around factories. ## Brief change log - Introduction of `org.apache.table.factories.TableFactory` a common interface for factories - Introduction of `org.apache.table.factories.TableFormatFactory` a specific table factory for formats - Specific factories for `StreamTableSource`, `StreamTableSink`, `BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and `SerializationSchema` - Deprecation of old format-specific table sources (sinks will be deprecated in a follow-up PR) - Possibility to register table source and sink under a common name (table type `both` in SQL Client YAML) ## Verifying this change - Existing tests verify the implementation - Additional ITCases and unit tests have been added - (An end-to-end test will follow in a separate PR) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6323 commit 980499f887d72ddf9a405c4ad200d0cab15d889c Author: Timo Walther Date: 2018-06-27T11:16:49Z [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors This PR introduces a format discovery mechanism based on Java Service Providers. The general `TableFormatFactory` is similar to the existing table source discovery mechanism. However, it allows for arbirary format interfaces that might be introduced in the future. At the moment, a connector can request configured instances of `DeserializationSchema` and `SerializationSchema`. In the future we can add interfaces such as a `Writer` or `KeyedSerializationSchema` without breaking backwards compatibility. This PR deprecates the existing strong coupling of connector and format for the Kafa table sources and table source factories. It introduces descriptor-based alternatives. commit 42a8a156d4e6f8f3d119c458350b6c897306fc48 Author: Shuyi Chen Date: 2018-06-19T19:00:34Z [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks This closes #6201. commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9 Author: Timo Walther Date: 2018-07-11T11:29:03Z Rename to TableFactory and move it to factories package commit 1c581cba61ba321bb6de6a4d298a881840d11cfe Author: Timo Walther Date: 2018-07-11T11:46:31Z Refactor format factories commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295 Author: Timo Walther Date: 2018-07-12T06:35:00Z Unify table factories commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916 Author: Timo Walther Date: 2018-07-12T07:05:50Z Move table type out of descriptors commit 6b83f2e1c0e63147f049dc5389c5633077b789a4 Author: Timo Walther Date: 2018-07-12T08:50:09Z Make source/sink factories environment-dependent commit 4f1255fd003080f078afe6ef67ffa58f40ffec36 Author: Timo Walther Date: 2018-07-12T18:48:45Z Clean up and simplify changes > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Majo
[jira] [Resolved] (FLINK-4807) ResourceManager clean up JobManager's registration
[ https://issues.apache.org/jira/browse/FLINK-4807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4807. -- Resolution: Duplicate Fix Version/s: 1.5.0 > ResourceManager clean up JobManager's registration > -- > > Key: FLINK-4807 > URL: https://issues.apache.org/jira/browse/FLINK-4807 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young >Priority: Major > Fix For: 1.5.0 > > > When RM received a JM's registration, it will record it either with some > leaderid or leadership listener. We should make sure the finished / failed JM > can properly unregister itself with RM. > We can make it happen by doing these two things: > 1. If JM finds out job reaches a terminate state(either success or fail), it > should send an unregistration request to RM. > 2. If (1) does not happen for various reasons, RM can rely on the heartbeat > manager to find out timeout JM and clear it up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-7472) Release task managers gracefully
[ https://issues.apache.org/jira/browse/FLINK-7472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7472. -- Resolution: Done > Release task managers gracefully > > > Key: FLINK-7472 > URL: https://issues.apache.org/jira/browse/FLINK-7472 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Eron Wright >Priority: Major > > When a task manager is no longer needed (e.g. due to idle timeout in slot > manager), the RM should gracefully stop it without spurious warnings. This > implies some actions should be taken before the TM is actually killed. > Proactive steps include stopping the heartbeat monitor and sending a > disconnect message. > It is unclear whether `RM::closeTaskManagerConnection` method should be > called proactively (when we plan to kill a TM), reactively (after the TM is > killed), or both. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session
[ https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542611#comment-16542611 ] ASF GitHub Bot commented on FLINK-9004: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 I extended the regular `README.md`. > Cluster test: Run general purpose job with failures with Yarn session > - > > Key: FLINK-9004 > URL: https://issues.apache.org/jira/browse/FLINK-9004 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on > a Yarn session cluster and simulate failures. > The job jar should be ill-packaged, meaning that we include too many > dependencies in the user jar. We should include the Scala library, Hadoop and > Flink itself to verify that there are no class loading issues. > The general purpose job should run with misbehavior activated. Additionally, > we should simulate at least the following failure scenarios: > * Kill Flink processes > * Kill connection to storage system for checkpoints and jobs > * Simulate network partition > We should run the test at least with the following state backend: RocksDB > incremental async and checkpointing to S3. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/6240 I extended the regular `README.md`. ---
[jira] [Assigned] (FLINK-7075) Implement Flip-6 standalone mode
[ https://issues.apache.org/jira/browse/FLINK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7075: Assignee: Till Rohrmann > Implement Flip-6 standalone mode > > > Key: FLINK-7075 > URL: https://issues.apache.org/jira/browse/FLINK-7075 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > This is an umbrella issue to sum up what's needed to implement Flink's > standalone mode with the new Flip-6 architecture -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7075) Implement Flip-6 standalone mode
[ https://issues.apache.org/jira/browse/FLINK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7075. Resolution: Fixed Fix Version/s: 1.5.0 Added to Flink 1.5.0 > Implement Flip-6 standalone mode > > > Key: FLINK-7075 > URL: https://issues.apache.org/jira/browse/FLINK-7075 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > This is an umbrella issue to sum up what's needed to implement Flink's > standalone mode with the new Flip-6 architecture -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-4897) Implement Dispatcher to support Flink sessions
[ https://issues.apache.org/jira/browse/FLINK-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4897. -- Resolution: Fixed Fix Version/s: 1.5.0 Added to Flink 1.5.0: 748eba1b5363aeb9e64d379b9b25d5d997bec7ad > Implement Dispatcher to support Flink sessions > -- > > Key: FLINK-4897 > URL: https://issues.apache.org/jira/browse/FLINK-4897 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos > Environment: FLIP-6 feature branch >Reporter: Eron Wright >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > This task is to implement the dispatcher component which reacts to calls from > the cluster's REST endpoint. > The dispatcher is responsible for job submission, job listing, job leader > lookups, restarting jobs in case of a recovery and the cluster's component > lifecycle management. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-4834) Implement unified High Availability Services Abstraction
[ https://issues.apache.org/jira/browse/FLINK-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4834. -- Resolution: Fixed Fix Version/s: 1.5.0 All subtasks have been completed in Flink 1.5.0. > Implement unified High Availability Services Abstraction > > > Key: FLINK-4834 > URL: https://issues.apache.org/jira/browse/FLINK-4834 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4343) Implement new TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4343: Assignee: Till Rohrmann > Implement new TaskManager > - > > Key: FLINK-4343 > URL: https://issues.apache.org/jira/browse/FLINK-4343 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > This is the parent issue for the efforts to implement the {{TaskManager}} > changes based on FLIP-6 > (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) > Because of the breadth of changes, we should implement a new version of the > {{TaskManager}} (let's call it {{TaskExecutor}}) rather than updating the > current {{TaskManager}}. That will allow us to keep a working master branch. > At the point when the new cluster management is on par with the current > implementation, we will drop the old {{TaskManager}} and rename the > {{TaskExecutor}} to {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-4343) Implement new TaskManager
[ https://issues.apache.org/jira/browse/FLINK-4343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-4343. -- Resolution: Fixed Fix Version/s: 1.5.0 The {{TaskExecutor}} has been added to Flink 1.5.0. > Implement new TaskManager > - > > Key: FLINK-4343 > URL: https://issues.apache.org/jira/browse/FLINK-4343 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > This is the parent issue for the efforts to implement the {{TaskManager}} > changes based on FLIP-6 > (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) > Because of the breadth of changes, we should implement a new version of the > {{TaskManager}} (let's call it {{TaskExecutor}}) rather than updating the > current {{TaskManager}}. That will allow us to keep a working master branch. > At the point when the new cluster management is on par with the current > implementation, we will drop the old {{TaskManager}} and rename the > {{TaskExecutor}} to {{TaskManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-7469) Handle slot requests occuring before RM registration completes
[ https://issues.apache.org/jira/browse/FLINK-7469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7469. -- Resolution: Duplicate Fix Version/s: 1.5.0 This issue should have been fixed with FLINK-9427. > Handle slot requests occuring before RM registration completes > -- > > Key: FLINK-7469 > URL: https://issues.apache.org/jira/browse/FLINK-7469 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Eron Wright >Priority: Minor > Fix For: 1.5.0 > > Attachments: jm.log, taskmanager-3.log > > > *Description* > Occasionally the TM-to-RM registration ask times out, causing the TM to pause > registration for 10 seconds. Meanwhile the registration may actually have > succeeded in the RM. Slot requests may then arrive at the TM while RM > registration is incomplete. > The current behavior appears to be that the TM honors the slot request. > Please determine whether this is a feature or a bug. If a feature, maybe a > slot request should implicitly complete the registration. > *Example* > See attached a log showing a certain TM exhibiting the described behavior. > The RM launched 12 TMs in parallel, evidently causing the RM to sluggishly > respond to a couple of the TM registration requests. From the logs we see > that '00012' and '3' experienced a registration timeout but accepted a > slot request anyway. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7095. Resolution: Duplicate Fix Version/s: 1.6.0 > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6, pull-request-available > Fix For: 1.6.0 > > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7095: -- Labels: flip-6 pull-request-available (was: flip-6) > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6, pull-request-available > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main
[ https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542601#comment-16542601 ] ASF GitHub Bot commented on FLINK-7095: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5375 Sorry for not getting back to you earlier @zhangminglei. I accidentally addressed this issue with #6318. I think we can therefore close this PR. Sorry for the bad PR management. This won't happen again. > Add proper command line parsing tool to TaskManagerRunner.main > -- > > Key: FLINK-7095 > URL: https://issues.apache.org/jira/browse/FLINK-7095 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Till Rohrmann >Priority: Minor > Labels: flip-6, pull-request-available > > We need to add a proper command line parsing tool to the entry point of the > {{TaskManagerRunner#main}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5375 Sorry for not getting back to you earlier @zhangminglei. I accidentally addressed this issue with #6318. I think we can therefore close this PR. Sorry for the bad PR management. This won't happen again. ---
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542600#comment-16542600 ] ASF GitHub Bot commented on FLINK-9822: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202254410 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job + +or + +docker build -t flink . + +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: + +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink --- End diff -- Good point, will update to the latest Flink version. > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202254368 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose --- End diff -- It applies only to the job cluster mode. Will update the title. ---
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202254410 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job + +or + +docker build -t flink . + +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: + +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink --- End diff -- Good point, will update to the latest Flink version. ---
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542599#comment-16542599 ] ASF GitHub Bot commented on FLINK-9822: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202254368 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose --- End diff -- It applies only to the job cluster mode. Will update the title. > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542596#comment-16542596 ] Till Rohrmann commented on FLINK-4319: -- [~yazdanjs] thanks for your interest. The community implemented a good part of the whole Flip-6 effort. I think we can actually close the linked issues here. However, there are still plenty of open issues and potential improvements which one could apply to Flink's distributed architecture. Most of these issues are assigned to the `Distributed Coordination` component. Please look for issues assigned to this component. I will update the old Flip-6 JIRA issues to reflect the current state. > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542594#comment-16542594 ] Chesnay Schepler commented on FLINK-5860: - [~maheshsenni] the cassandra example should not be touched as it is not a actually test. The {{TestingTaskManagerRuntimeInfo}} constructor should be removed. The {{RecordOrEventCollectingResultPartitionWriter}} and {{RecordCollectingResultPartitionWriter}} constructors should be modified to accept a temp directory. Note that it is perfectly fine to address this JIRA in multiple PRs. This will make things easier to review and reduces the risk of your changes becoming out-dated. > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Mahesh Senniappan >Priority: Major > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static
[jira] [Commented] (FLINK-9771) "Show Plan" option under Submit New Job in WebUI not working
[ https://issues.apache.org/jira/browse/FLINK-9771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542591#comment-16542591 ] ASF GitHub Bot commented on FLINK-9771: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6274 > "Show Plan" option under Submit New Job in WebUI not working > -- > > Key: FLINK-9771 > URL: https://issues.apache.org/jira/browse/FLINK-9771 > Project: Flink > Issue Type: Bug > Components: Job-Submission, Webfrontend >Affects Versions: 1.5.0, 1.6.0 >Reporter: Yazdan Shirvany >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > {{Show Plan}} button under {{Submit new job}} in WebUI not working. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542588#comment-16542588 ] ASF GitHub Bot commented on FLINK-9563: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6170 > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module
[ https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542592#comment-16542592 ] ASF GitHub Bot commented on FLINK-9091: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6102 > Failure while enforcing releasability in building flink-json module > --- > > Key: FLINK-9091 > URL: https://issues.apache.org/jira/browse/FLINK-9091 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0, 1.6.0 >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > Attachments: f-json.out > > > Got the following when building flink-json module: > {code} > [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence > failed with message: > Failed while enforcing releasability. See above detailed error message. > ... > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce > (dependency-convergence) on project flink-json: Some Enforcer rules have > failed. Look above for specific messages explaining why the rule failed. -> > [Help 1] > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9801) flink-dist is missing dependency on flink-examples
[ https://issues.apache.org/jira/browse/FLINK-9801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542589#comment-16542589 ] ASF GitHub Bot commented on FLINK-9801: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6304 > flink-dist is missing dependency on flink-examples > -- > > Key: FLINK-9801 > URL: https://issues.apache.org/jira/browse/FLINK-9801 > Project: Flink > Issue Type: Improvement > Components: Build System, Examples >Affects Versions: 1.5.1, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > For the assembly of {{flink-dist}} we copy various batch/streaming examples > directly from the respective /target directory. > Never mind that this is already a problem as is (see FLINK-9582), > {{flink-dist}} defines no dependency on these modules. > If you were to only compile {{flink-dist}} with the {{-am}} flag (to also > build all dependencies) it thus _may_ or _may not_ happen that these modules > are actually compiled, which could cause these examples to not be included in > the final assembly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9810) JarListHandler does not close opened jars
[ https://issues.apache.org/jira/browse/FLINK-9810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542590#comment-16542590 ] ASF GitHub Bot commented on FLINK-9810: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6310 > JarListHandler does not close opened jars > - > > Key: FLINK-9810 > URL: https://issues.apache.org/jira/browse/FLINK-9810 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.4.2, 1.6.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.2, 1.6.0 > > > {code} > try { > JarFile jar = new JarFile(f); > Manifest manifest = jar.getManifest(); > String assemblerClass = null; > if (manifest != null) { > assemblerClass = > manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS); > if (assemblerClass == null) { > assemblerClass = > manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS); > } > } > if (assemblerClass != null) { > classes = assemblerClass.split(","); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6170 ---
[GitHub] flink pull request #6304: [FLINK-9801][build] Add missing example dependenci...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6304 ---
[GitHub] flink pull request #6102: [FLINK-9091][build] Dependency convergence run aga...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6102 ---
[GitHub] flink pull request #6274: [FLINK-9771][rest] Fix plan JSON response
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6274 ---
[GitHub] flink pull request #6310: [FLINK-9810][rest] Close jar file in JarListHandle...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6310 ---
[jira] [Closed] (FLINK-9563) Migrate integration tests for CEP
[ https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9563. --- Resolution: Fixed Fix Version/s: 1.6.0 master: 40f9131e9136f4f956c59e4c0c837afba8b9bb4d > Migrate integration tests for CEP > - > > Key: FLINK-9563 > URL: https://issues.apache.org/jira/browse/FLINK-9563 > Project: Flink > Issue Type: Sub-task >Reporter: Deepak Sharma >Assignee: Deepak Sharma >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > Covers all integration tests under > apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails
[ https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542577#comment-16542577 ] ASF GitHub Bot commented on FLINK-9276: --- Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5954 > Improve error message when TaskManager fails > > > Key: FLINK-9276 > URL: https://issues.apache.org/jira/browse/FLINK-9276 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > When a TaskManager fails, we frequently get a message > {code} > org.apache.flink.util.FlinkException: Releasing TaskManager > container_1524853016208_0001_01_000102 > {code} > This message is misleading in that it sounds like an intended operation, when > it really is a failure of a container that the {{ResourceManager}} reports to > the {{JobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5954 ---
[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails
[ https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542576#comment-16542576 ] ASF GitHub Bot commented on FLINK-9276: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5954 OK, closing this PR... > Improve error message when TaskManager fails > > > Key: FLINK-9276 > URL: https://issues.apache.org/jira/browse/FLINK-9276 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > When a TaskManager fails, we frequently get a message > {code} > org.apache.flink.util.FlinkException: Releasing TaskManager > container_1524853016208_0001_01_000102 > {code} > This message is misleading in that it sounds like an intended operation, when > it really is a failure of a container that the {{ResourceManager}} reports to > the {{JobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5954 OK, closing this PR... ---
[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r202249510 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -99,11 +110,21 @@ private AbstractStateBackend stateBackend; + @Parameterized.Parameter + public StateBackendEnum stateBackendEnum; + enum StateBackendEnum { MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC } - protected abstract StateBackendEnum getStateBackend(); + @Parameterized.Parameters(name = "statebackend type ={0}") + public static Collection parameter() { + return Arrays.asList(MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC); --- End diff -- Here we could say `Arrays.asList(StateBackendEnum.values())`. ---
[jira] [Commented] (FLINK-9807) Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized
[ https://issues.apache.org/jira/browse/FLINK-9807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542572#comment-16542572 ] ASF GitHub Bot commented on FLINK-9807: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6305#discussion_r202249510 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -99,11 +110,21 @@ private AbstractStateBackend stateBackend; + @Parameterized.Parameter + public StateBackendEnum stateBackendEnum; + enum StateBackendEnum { MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC } - protected abstract StateBackendEnum getStateBackend(); + @Parameterized.Parameters(name = "statebackend type ={0}") + public static Collection parameter() { + return Arrays.asList(MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC); --- End diff -- Here we could say `Arrays.asList(StateBackendEnum.values())`. > Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized > -- > > Key: FLINK-9807 > URL: https://issues.apache.org/jira/browse/FLINK-9807 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > Now, the `AbastractEventTimeWIndowCheckpointITCase` and > `AbstractLocalRecoveryITCase` need to re-implement for every backend, we can > improve this by using JUnit parameterized -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase
[ https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9764: - Priority: Critical (was: Major) > Failure in LocalRecoveryRocksDBFullITCase > - > > Key: FLINK-9764 > URL: https://issues.apache.org/jira/browse/FLINK-9764 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.6.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.6.0 > > > {code} > Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase > Starting null#executeTest. > org.apache.flink.runtime.client.JobExecutionException: > java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but > was:<1209> > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> > but was:<1209> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInput
[jira] [Updated] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase
[ https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9764: - Fix Version/s: 1.6.0 > Failure in LocalRecoveryRocksDBFullITCase > - > > Key: FLINK-9764 > URL: https://issues.apache.org/jira/browse/FLINK-9764 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming >Affects Versions: 1.6.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.6.0 > > > {code} > Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase > Starting null#executeTest. > org.apache.flink.runtime.client.JobExecutionException: > java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but > was:<1209> > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> > but was:<1209> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r
[jira] [Assigned] (FLINK-9807) Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized
[ https://issues.apache.org/jira/browse/FLINK-9807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9807: Assignee: Congxian Qiu > Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized > -- > > Key: FLINK-9807 > URL: https://issues.apache.org/jira/browse/FLINK-9807 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > Now, the `AbastractEventTimeWIndowCheckpointITCase` and > `AbstractLocalRecoveryITCase` need to re-implement for every backend, we can > improve this by using JUnit parameterized -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails
[ https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542549#comment-16542549 ] ASF GitHub Bot commented on FLINK-9276: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5954 I think this is not super critical and since we only pass in `Exceptions` at the moment I would say let's close this PR. > Improve error message when TaskManager fails > > > Key: FLINK-9276 > URL: https://issues.apache.org/jira/browse/FLINK-9276 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > When a TaskManager fails, we frequently get a message > {code} > org.apache.flink.util.FlinkException: Releasing TaskManager > container_1524853016208_0001_01_000102 > {code} > This message is misleading in that it sounds like an intended operation, when > it really is a failure of a container that the {{ResourceManager}} reports to > the {{JobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5954 I think this is not super critical and since we only pass in `Exceptions` at the moment I would say let's close this PR. ---
[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542547#comment-16542547 ] ASF GitHub Bot commented on FLINK-9575: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248523 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) --- End diff -- This call should also be called if the removal of the job from the `SubmittedJobGraphStore` failed because it does not remove any HA files. > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542546#comment-16542546 ] ASF GitHub Bot commented on FLINK-9575: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248449 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) + jobManagerMetricGroup.removeJob(jobID) --- End diff -- I think we could always execute this call independent of whether the removal from the `SubmittedJobGraphStore` was successful or not. > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542548#comment-16542548 ] ASF GitHub Bot commented on FLINK-9575: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248365 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) --- End diff -- Can't we move these this line in the future where we remove the job from the `SubmittedJobGraphStore`? > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the following code is executed : > {noformat} > > val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val > result = if (removeJobFromStateBackend) { val futureOption = Some(future { > try { // ...otherwise, we can have lingering resources when there is a > concurrent shutdown // and the ZooKeeper client is closed. Not removing the > job immediately allow the // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => > log.warn(s"Could not remove submitted job graph $jobID.", t) } > }(context.dispatcher)) try { archive ! decorateMessage( > ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch > { case t: Throwable => log.warn(s"Could not archive the execution graph > $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result > case None => None } // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, > removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) > futureOption } > val futureOption = currentJobs.get(jobID) match { > case Some((eg, _)) => > val result = if (removeJobFromStateBackend) { > val futureOption = Some(future { > try { > // ...otherwise, we can have lingering resources when there is a concurrent > shutdown > // and the ZooKeeper client is closed. Not removing the job immediately allow > the > // shutdown to release all resources. > submittedJobGraphs.removeJobGraph(jobID) > } catch { > case t: Throwable => log.warn(s"Could not remove submitted job graph > $jobID.", t) > } > }(context.dispatcher)) > try { > archive ! decorateMessage( > ArchiveExecutionGraph( > jobID, > ArchivedExecutionGraph.createFrom(eg))) > } catch { > case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", > t) > } > futureOption > } else { > None > } > currentJobs.remove(jobID) > result > case None => None > } > // remove all job-related BLOBs from local and HA store > libraryCacheManager.unregisterJob(jobID) > blobServer.cleanupJob(jobID, removeJobFromStateBackend) > jobManagerMetricGroup.removeJob(jobID) > futureOption > }{noformat} > This causes the asynchronous removal of the job and synchronous removal of > blob files connected with this jar. This means as far as I understand that > there is a potential problem that we can fail to remove job graph from > _submittedJobGraphs._ If the JobManager fails and we elect the new leader it > can try to recover such job, but it will fail with an exception since the > assigned blob was already removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248523 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) --- End diff -- This call should also be called if the removal of the job from the `SubmittedJobGraphStore` failed because it does not remove any HA files. ---
[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248449 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) + jobManagerMetricGroup.removeJob(jobID) --- End diff -- I think we could always execute this call independent of whether the removal from the `SubmittedJobGraphStore` was successful or not. ---
[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202248365 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } -// remove all job-related BLOBs from local and HA store -libraryCacheManager.unregisterJob(jobID) -blobServer.cleanupJob(jobID, removeJobFromStateBackend) +// remove all job-related BLOBs from local and HA store, only if the job was removed correctly +futureOption match { + case Some(future) => future.onComplete{ +case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) --- End diff -- Can't we move these this line in the future where we remove the job from the `SubmittedJobGraphStore`? ---
[jira] [Commented] (FLINK-9832) Allow commas in job submission query params
[ https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542526#comment-16542526 ] Chesnay Schepler commented on FLINK-9832: - HA, it's not a parsing error but the _handler_ fails the request, so we're good. > Allow commas in job submission query params > --- > > Key: FLINK-9832 > URL: https://issues.apache.org/jira/browse/FLINK-9832 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.1 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > As reported on the user mailing list in the thread "Run programs w/ params > including comma via REST api" [1], submitting a job with mainArgs that > include a comma results in an exception. > To reproduce submit a job with the following mainArgs: > {code} > --servers 10.100.98.9:9092,10.100.98.237:9092 > {code} > The request fails with > {code} > Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092]. > {code} > As a work around, users have to use a different delimiter such as {{;}}. > The proper fix of this API would make these params part of the {{POST}} > request instead of relying on query params (as noted in FLINK-9499). I think > it's still valuable to fix this as part of a bug fix release for 1.5. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9832) Allow commas in job submission query params
[ https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542522#comment-16542522 ] Chesnay Schepler commented on FLINK-9832: - hold on, that doesn't quite work since a parameter parsing error fails the request... > Allow commas in job submission query params > --- > > Key: FLINK-9832 > URL: https://issues.apache.org/jira/browse/FLINK-9832 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.1 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > As reported on the user mailing list in the thread "Run programs w/ params > including comma via REST api" [1], submitting a job with mainArgs that > include a comma results in an exception. > To reproduce submit a job with the following mainArgs: > {code} > --servers 10.100.98.9:9092,10.100.98.237:9092 > {code} > The request fails with > {code} > Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092]. > {code} > As a work around, users have to use a different delimiter such as {{;}}. > The proper fix of this API would make these params part of the {{POST}} > request instead of relying on query params (as noted in FLINK-9499). I think > it's still valuable to fix this as part of a bug fix release for 1.5. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9832) Allow commas in job submission query params
[ https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542520#comment-16542520 ] Chesnay Schepler commented on FLINK-9832: - I have an idea to fix this in a better way for now. The core issue with switching to POST is that legacy handler to not have access to the request body. We can avoid this problem by both sending parameters in the POST body, and as query parameters. The legacy handlers will continue to use the query parameters and won't even be aware that anything has changed, the new handlers will access the (optional!) post body, and prioritize that over query parameters. > Allow commas in job submission query params > --- > > Key: FLINK-9832 > URL: https://issues.apache.org/jira/browse/FLINK-9832 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.1 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > As reported on the user mailing list in the thread "Run programs w/ params > including comma via REST api" [1], submitting a job with mainArgs that > include a comma results in an exception. > To reproduce submit a job with the following mainArgs: > {code} > --servers 10.100.98.9:9092,10.100.98.237:9092 > {code} > The request fails with > {code} > Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092]. > {code} > As a work around, users have to use a different delimiter such as {{;}}. > The proper fix of this API would make these params part of the {{POST}} > request instead of relying on query params (as noted in FLINK-9499). I think > it's still valuable to fix this as part of a bug fix release for 1.5. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9832) Allow commas in job submission query params
[ https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-9832: --- Assignee: Chesnay Schepler > Allow commas in job submission query params > --- > > Key: FLINK-9832 > URL: https://issues.apache.org/jira/browse/FLINK-9832 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.1 >Reporter: Ufuk Celebi >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > As reported on the user mailing list in the thread "Run programs w/ params > including comma via REST api" [1], submitting a job with mainArgs that > include a comma results in an exception. > To reproduce submit a job with the following mainArgs: > {code} > --servers 10.100.98.9:9092,10.100.98.237:9092 > {code} > The request fails with > {code} > Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092]. > {code} > As a work around, users have to use a different delimiter such as {{;}}. > The proper fix of this API would make these params part of the {{POST}} > request instead of relying on query params (as noted in FLINK-9499). I think > it's still valuable to fix this as part of a bug fix release for 1.5. > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long
[ https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542478#comment-16542478 ] ASF GitHub Bot commented on FLINK-9413: --- Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/6103 @tillrohrmann already did that, it looks alleviate though not fix. I'm upgrade from 1.2.0 to 1.4.2. Major thing i can see is TM now connection to HDFS instead of only talk to JobManager, could this increase the possibility of this issue? > Tasks can fail with PartitionNotFoundException if consumer deployment takes > too long > > > Key: FLINK-9413 > URL: https://issues.apache.org/jira/browse/FLINK-9413 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.4.0, 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: zhangminglei >Priority: Critical > Labels: pull-request-available > > {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of > the producer takes too long. More specifically, if it takes longer than the > {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up > and fail. > The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a > consuming task once the producer has been assigned a slot but we do not wait > until it is actually running. The problem should be fixed if we wait until > the task is in state {{RUNNING}} before assigning the result partition to the > consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...
Github user RalphSu commented on the issue: https://github.com/apache/flink/pull/6103 @tillrohrmann already did that, it looks alleviate though not fix. I'm upgrade from 1.2.0 to 1.4.2. Major thing i can see is TM now connection to HDFS instead of only talk to JobManager, could this increase the possibility of this issue? ---
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542428#comment-16542428 ] ASF GitHub Bot commented on FLINK-9692: --- Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 The idea here is that `maxNumberOfRecordsPerFetch` should never be a value that gets records that exceeds the read limit (2 Mb/sec) from the math here. ``` 2 Mb/sec / (averageRecordSizeBytes * # reads/sec)) ``` Atleast that's what the intent is - Let me know if that makes sense or if there is something amiss about the approach here. If there is a way in which `maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, it will still be throttled by Kinesis. > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 The idea here is that `maxNumberOfRecordsPerFetch` should never be a value that gets records that exceeds the read limit (2 Mb/sec) from the math here. ``` 2 Mb/sec / (averageRecordSizeBytes * # reads/sec)) ``` Atleast that's what the intent is - Let me know if that makes sense or if there is something amiss about the approach here. If there is a way in which `maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, it will still be throttled by Kinesis. ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227845 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 1L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- Changed. ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227834 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval between each attempt to discover new shards. */ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + /** The config to turn on adaptive reads from a shard. */ + public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads"; --- End diff -- Changed. ---
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542423#comment-16542423 ] ASF GitHub Bot commented on FLINK-9692: --- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227834 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval between each attempt to discover new shards. */ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + /** The config to turn on adaptive reads from a shard. */ + public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads"; --- End diff -- Changed. > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542424#comment-16542424 ] ASF GitHub Bot commented on FLINK-9692: --- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227845 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 1L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- Changed. > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542293#comment-16542293 ] ASF GitHub Bot commented on FLINK-9692: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202201507 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 1L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542292#comment-16542292 ] ASF GitHub Bot commented on FLINK-9692: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202199865 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval between each attempt to discover new shards. */ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + /** The config to turn on adaptive reads from a shard. */ + public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads"; --- End diff -- [most Flink's feature flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) are named `xx.enabled`, I'd suggest rename it to something like `flink.shard.adaptive.read.records.enabled` > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202199865 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval between each attempt to discover new shards. */ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + /** The config to turn on adaptive reads from a shard. */ + public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads"; --- End diff -- [most Flink's feature flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) are named `xx.enabled`, I'd suggest rename it to something like `flink.shard.adaptive.read.records.enabled` ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202201507 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 1L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); ---
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542273#comment-16542273 ] ASF GitHub Bot commented on FLINK-9822: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202198679 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose --- End diff -- Does this apply to both standalone and cluster mode? Want to get clarified since the PR title says it's for standaloneJobCluster > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image
[ https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542272#comment-16542272 ] ASF GitHub Bot commented on FLINK-9822: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202197686 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job + +or + +docker build -t flink . + +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: + +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink --- End diff -- Is FLINK_VERSION 1.0.3 only for demo purpose? Can we use a more recent version for demoing? > Add Dockerfile for StandaloneJobClusterEntryPoint image > --- > > Key: FLINK-9822 > URL: https://issues.apache.org/jira/browse/FLINK-9822 > Project: Flink > Issue Type: New Feature > Components: Docker >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Add a {{Dockerfile}} to create an image which contains the > {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The > entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} > with the added user code jar. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202198679 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose --- End diff -- Does this apply to both standalone and cluster mode? Want to get clarified since the PR title says it's for standaloneJobCluster ---
[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/6319#discussion_r202197686 --- Diff: flink-container/docker/README.md --- @@ -0,0 +1,44 @@ +# Apache Flink cluster deployment on docker using docker-compose + +## Installation + +Install the most recent stable version of docker +https://docs.docker.com/installation/ + +## Build + +Images are based on the official Java Alpine (OpenJDK 8) image. If you want to +build the flink image run: + +sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job + +or + +docker build -t flink . + +If you want to build the container for a specific version of flink/hadoop/scala +you can configure it in the respective args: + +docker build --build-arg FLINK_VERSION=1.0.3 --build-arg HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t "flink:1.0.3-hadoop2.6-scala_2.10" flink --- End diff -- Is FLINK_VERSION 1.0.3 only for demo purpose? Can we use a more recent version for demoing? ---
[jira] [Assigned] (FLINK-9703) Mesos does not expose TM Prometheus port
[ https://issues.apache.org/jira/browse/FLINK-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9703: Assignee: Rune Skou Larsen > Mesos does not expose TM Prometheus port > > > Key: FLINK-9703 > URL: https://issues.apache.org/jira/browse/FLINK-9703 > Project: Flink > Issue Type: Bug > Components: Mesos >Reporter: Rune Skou Larsen >Assignee: Rune Skou Larsen >Priority: Major > Labels: pull-request-available > > LaunchableMesosWorker makes Mesos expose these ports for a Task Manager: > {{private static final String[] TM_PORT_KEYS = {}} > {{ "taskmanager.rpc.port",}} > {{ "taskmanager.data.port"};}} > But when running Prometheus Exporter on a TM, another port needs to be > exposed to make Flink's Prometheos endpoint externally scrapable by the > Prometheus server. By default this is port 9249, but it is configurable > according to: > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter] > > My plan is to make a PR, that just adds another config option for mesos, to > enable custom ports to be exposed in the provisioned TMs. > I considered carrying parts of the Metrics config into the Mesos code to > automatically map metrics ports in mesos. But making such a "shortcut" > between Flink's metrics and mesos modules would probably need some sort of > integration testing, so I prefer the simple solution of just adding another > Mesos config option. But comments are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9703) Mesos does not expose TM Prometheus port
[ https://issues.apache.org/jira/browse/FLINK-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542188#comment-16542188 ] ASF GitHub Bot commented on FLINK-9703: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6288#discussion_r202171601 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -332,6 +334,22 @@ public String toString() { return taskInfo.build(); } + /** +* Get port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as +* data and rpc as well as optionally configured endpoints for services such as prometheus reporter +* +* @return A deterministicly ordered Set of port keys to expose from the TM container +*/ + private Set getPortKeys() { + LinkedHashSet tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS)); + containerSpec.getDynamicConfiguration().keySet().stream() + .filter(key -> key.endsWith(".port") || key.endsWith(".ports")) // This matches property naming convention --- End diff -- I agree that simply taking all configuration values which end with `port` and `ports` is problematic. What about configuration values whose value is needed, e.g. remote ports, and should not be overwritten. For example the `jobmanager.rpc.port` is one of these configuration values. I would rather prefer a mesos specific configuration value which we can use to define whose ports need to be dynamically assigned. For example `mesos.dynamic-port-assignment: "metrics.prom.port, taskmanager.rpc.port, taskmanager.data.port"`. What do you think? > Mesos does not expose TM Prometheus port > > > Key: FLINK-9703 > URL: https://issues.apache.org/jira/browse/FLINK-9703 > Project: Flink > Issue Type: Bug > Components: Mesos >Reporter: Rune Skou Larsen >Priority: Major > Labels: pull-request-available > > LaunchableMesosWorker makes Mesos expose these ports for a Task Manager: > {{private static final String[] TM_PORT_KEYS = {}} > {{ "taskmanager.rpc.port",}} > {{ "taskmanager.data.port"};}} > But when running Prometheus Exporter on a TM, another port needs to be > exposed to make Flink's Prometheos endpoint externally scrapable by the > Prometheus server. By default this is port 9249, but it is configurable > according to: > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter] > > My plan is to make a PR, that just adds another config option for mesos, to > enable custom ports to be exposed in the provisioned TMs. > I considered carrying parts of the Metrics config into the Mesos code to > automatically map metrics ports in mesos. But making such a "shortcut" > between Flink's metrics and mesos modules would probably need some sort of > integration testing, so I prefer the simple solution of just adding another > Mesos config option. But comments are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6288: [FLINK-9703] [flink-mesos] Expose Prometheus port ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6288#discussion_r202171601 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -332,6 +334,22 @@ public String toString() { return taskInfo.build(); } + /** +* Get port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as +* data and rpc as well as optionally configured endpoints for services such as prometheus reporter +* +* @return A deterministicly ordered Set of port keys to expose from the TM container +*/ + private Set getPortKeys() { + LinkedHashSet tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS)); + containerSpec.getDynamicConfiguration().keySet().stream() + .filter(key -> key.endsWith(".port") || key.endsWith(".ports")) // This matches property naming convention --- End diff -- I agree that simply taking all configuration values which end with `port` and `ports` is problematic. What about configuration values whose value is needed, e.g. remote ports, and should not be overwritten. For example the `jobmanager.rpc.port` is one of these configuration values. I would rather prefer a mesos specific configuration value which we can use to define whose ports need to be dynamically assigned. For example `mesos.dynamic-port-assignment: "metrics.prom.port, taskmanager.rpc.port, taskmanager.data.port"`. What do you think? ---
[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis
[ https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542132#comment-16542132 ] ASF GitHub Bot commented on FLINK-9692: --- Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202156901 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { --- End diff -- Makes sense. Done. > Adapt maxRecords parameter in the getRecords call to optimize bytes read from > Kinesis > -- > > Key: FLINK-9692 > URL: https://issues.apache.org/jira/browse/FLINK-9692 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: performance, pull-request-available > > The Kinesis connector currently has a [constant > value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213] > set for maxRecords that it can fetch from a single Kinesis getRecords call. > However, in most realtime scenarios, the average size of the Kinesis record > (in bytes) changes depending on the situation i.e. you could be in a > transient scenario where you are reading large sized records and would hence > like to fetch fewer records in each getRecords call (so as to not exceed the > 2 Mb/sec [per shard > limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html] > on the getRecords call). > The idea here is to adapt the Kinesis connector to identify an average batch > size prior to making the getRecords call, so that the maxRecords parameter > can be appropriately tuned before making the call. > This feature can be behind a > [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java] > flag that defaults to false. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202156901 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { --- End diff -- Makes sense. Done. ---
[jira] [Closed] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8480. - Resolution: Fixed Merged on master with 42ada8ad9ca28f94d0a0355658330198bbc2b577. > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8480: -- Labels: pull-request-available (was: ) > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator
[ https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542120#comment-16542120 ] ASF GitHub Bot commented on FLINK-8480: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5482 > Implement Java API to expose join functionality of > TimeBoundedStreamJoinOperator > > > Key: FLINK-8480 > URL: https://issues.apache.org/jira/browse/FLINK-8480 > Project: Flink > Issue Type: Sub-task >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for Interval...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5482 ---
[jira] [Closed] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-9701. - Resolution: Implemented Merged in: master: f45b7f7ff2 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542045#comment-16542045 ] ASF GitHub Bot commented on FLINK-9701: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6313 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6313 ---
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542040#comment-16542040 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6313 LGTM, nice work! 👍 Besides one comment about closing the backends after tests, the PR is ready. This is no big thing so I will just fix it myself before merging now. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6313 LGTM, nice work! ð Besides one comment about closing the backends after tests, the PR is ready. This is no big thing so I will just fix it myself before merging now. ---
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542039#comment-16542039 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202130806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); --- End diff -- There is a problem that the backend is only disposed here and not after each test, this leads to some native errors when I run the test. I suggest to give this context a `dispose` method and call it in a `@After` method. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202130806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); --- End diff -- There is a problem that the backend is only disposed here and not after each test, this leads to some native errors when I run the test. I suggest to give this context a `dispose` method and call it in a `@After` method. ---
[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java
[ https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542013#comment-16542013 ] ASF GitHub Bot commented on FLINK-9829: --- Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6321 @zentol, hi, there are two ways to contribute to flink project. one is create a `jira FLINK-` , another is `[hotfix] XXX`. for example, fix some checkstyle or code refactor, how to choose the best way to contribute ? > The wrapper classes be compared by symbol of '==' directly in > BigDecSerializer.java > --- > > Key: FLINK-9829 > URL: https://issues.apache.org/jira/browse/FLINK-9829 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > The wrapper classes should be compared by equals method rather than by symbol > of '==' directly in BigDecSerializer.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...
Github user lamber-ken commented on the issue: https://github.com/apache/flink/pull/6321 @zentol, hi, there are two ways to contribute to flink project. one is create a `jira FLINK-` , another is `[hotfix] XXX`. for example, fix some checkstyle or code refactor, how to choose the best way to contribute ? ---
[jira] [Commented] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null
[ https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541946#comment-16541946 ] ASF GitHub Bot commented on FLINK-8544: --- Github user BillLeecn commented on the issue: https://github.com/apache/flink/pull/5516 @dawidwys Of course not. The patch has been updated. > JSONKeyValueDeserializationSchema throws NPE when message key is null > - > > Key: FLINK-8544 > URL: https://issues.apache.org/jira/browse/FLINK-8544 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Bill Lee >Priority: Major > Labels: pull-request-available > Original Estimate: 1h > Remaining Estimate: 1h > > JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key > without validation. > If a message with key == null is read, flink throws an NPE. > {code:java} > @Override > public ObjectNode deserialize(byte[] messageKey, byte[] message, String > topic, int partition, long offset) throws IOException { > if (mapper == null) { > mapper = new ObjectMapper(); > } > ObjectNode node = mapper.createObjectNode(); > node.set("key", mapper.readValue(messageKey, JsonNode.class)); > // messageKey is not validate against null. > node.set("value", mapper.readValue(message, JsonNode.class)); > {code} > The fix is very straightforward. > {code:java} > if (messageKey == null) { > node.set("key", null) > } else { > node.set("key", mapper.readValue(messageKey, > JsonNode.class)); > } > {code} > If it is appreciated, I would send a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5516: [FLINK-8544] [Kafka Connector] Handle null message key in...
Github user BillLeecn commented on the issue: https://github.com/apache/flink/pull/5516 @dawidwys Of course not. The patch has been updated. ---
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541926#comment-16541926 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202103545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- You're right. I'm just wondering whether you ever want to enable checkpointing without a restart strategy. So to speak if you set `FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or `NoRestartStrategy`? On the other hand you might want to disable restarting for all jobs running on your cluster by setting the restart strategy to `NoRestartStrategy`. Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` to `FallbackRestartStrategy` and introduce a new default server side restart strategy `NoOrFixedIfCheckpointingEnabled` which resolved to `FixedRestartStrategy` if checkpointing is enabled and if not it resolves to `NoRestartStrategy`. What do you think? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202103545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- You're right. I'm just wondering whether you ever want to enable checkpointing without a restart strategy. So to speak if you set `FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or `NoRestartStrategy`? On the other hand you might want to disable restarting for all jobs running on your cluster by setting the restart strategy to `NoRestartStrategy`. Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` to `FallbackRestartStrategy` and introduce a new default server side restart strategy `NoOrFixedIfCheckpointingEnabled` which resolved to `FixedRestartStrategy` if checkpointing is enabled and if not it resolves to `NoRestartStrategy`. What do you think? ---
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541921#comment-16541921 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202103083 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- No, timers cannot use state descriptor, they cannot extend `State` > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202103083 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- No, timers cannot use state descriptor, they cannot extend `State` ---
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541899#comment-16541899 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202098798 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- Right now null is a bit different than `FallbackRestartStrategy`. * null - allows fallback to `FixedRestartStrategy` in case of checkpointing enabled and `noRestart` was set on server-side * `FallbackRestartStrategy` - always the server-side strategy is used (indifferent to checkpointing) If we by default set the `FallbackStrategy` we have two options: * we either always set `FixedRestartStrategy` if checkpointing is enabled and `noRestart` was set on server side * we never automatically fallback to `FixedRestartStrategy`, even in case of checkpointing. What do you think would be better option? Keep the null, always fallback to `FixedRestartStrategy` or never fallback to it? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)