[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265563#comment-16265563 ] ASF GitHub Bot commented on FLINK-7449: --- Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r153034462 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r153034462 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we +have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state +for the job. This is the basic principle of incremental checkpoints, that each checkpoint can build upon a history of +previous checkpoints to avoid writing redundant information. + +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full checkpointing. + +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full checkpoint is simply a copy of the whole +state. + + + + + +With incremental checkpointing, each checkpoint contains only the state change since the previous checkpoint. + +* For the first
[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints
[ https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1626#comment-1626 ] ASF GitHub Bot commented on FLINK-7449: --- Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r153033430 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we --- End diff -- @StefanRRichter I'm struggling to understand this sentence. > As long as we have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state for the job. Do you mean… > As long as we have the previous checkpoint, if the state changes for the current checkpoint, we can restore the full, current state for the job. Or something different? Explain to me what you're trying to say :) > Improve and enhance
[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Github user ChrisChinchilla commented on a diff in the pull request: https://github.com/apache/flink/pull/4543#discussion_r153033430 --- Diff: docs/ops/state/checkpoints.md --- @@ -99,3 +99,296 @@ above). ```sh $ bin/flink run -s :checkpointMetaDataPath [:runArgs] ``` + +## Incremental Checkpoints + +### Synopsis + +Incremental checkpoints can significantly reduce checkpointing time in comparison to full checkpoints, at the cost of a +(potentially) longer recovery time. The core idea is that incremental checkpoints only record changes in state since the +previously-completed checkpoint instead of producing a full, self-contained backup of the state backend. In this way, +incremental checkpoints can build upon previous checkpoints. + +RocksDBStateBackend is currently the only backend that supports incremental checkpoints. + +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints are eventually subsumed and +pruned automatically. + +``While we strongly encourage the use of incremental checkpoints for Flink jobs with large state, please note that this is +a new feature and currently not enabled by default``. + +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the +constructor set to `true`, e.g.: + +```java + RocksDBStateBackend backend = + new RocksDBStateBackend(filebackend, true); +``` + +### Use-case for Incremental Checkpoints + +Checkpoints are the centrepiece of Flinkâs fault tolerance mechanism and each checkpoint represents a consistent +snapshot of the distributed state of a Flink job from which the system can recover in case of a software or machine +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). + +Flink creates checkpoints periodically to track the progress of a job so that, in case of failure, only those +(hopefully few) *events that have been processed after the last completed checkpoint* must be reprocessed from the data +source. The number of events that must be reprocessed has implications for recovery time, and so for fastest recovery, +we want to *take checkpoints as often as possible*. + +However, checkpoints are not without performance cost and can introduce *considerable overhead* to the system. This +overhead can lead to lower throughput and higher latency during the time that checkpoints are created. One reason is +that, traditionally, each checkpoint in Flink always represented the *complete state* of the job at the time of the +checkpoint, and all of the state had to be written to stable storage (typically some distributed file system) for every +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint can obviously create +significant load for the I/O and network subsystems, on top of the normal load from pipelineâs data processing work. + +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery time and checkpointing +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this is exactly the problem that +incremental checkpoints solve. + + +### Basics of Incremental Checkpoints + +In this section, for the sake of getting the concept across, we will briefly discuss the idea behind incremental +checkpoints in a simplified manner. + +Our motivation for incremental checkpointing stemmed from the observation that it is often wasteful to write the full +state of a job for every single checkpoint. In most cases, the state between two checkpoints is not drastically +different, and only a fraction of the state data is modified and some new data added. Instead of writing the full state +into each checkpoint again and again, we could record only changes in state since the previous checkpoint. As long as we --- End diff -- @StefanRRichter I'm struggling to understand this sentence. > As long as we have the previous checkpoint and the state changes for the current checkpoint, we can restore the full, current state for the job. Do you mean⦠> As long as we have the previous checkpoint, if the state changes for the current checkpoint, we can restore the full, current state for the job. Or something different? Explain to me what you're trying to say :) ---
[GitHub] flink issue #5045: [hotfix][docs] Review of concepts docs for grammar and cl...
Github user ChrisChinchilla commented on the issue: https://github.com/apache/flink/pull/5045 @greghogan That said, the whole 'we' vs 'you' discussion is purely a stylistic preference (if that's what you meant) as long as it's consistent and clear who you mean, but the comment about removing passive voice remains :) ---
[GitHub] flink issue #5045: [hotfix][docs] Review of concepts docs for grammar and cl...
Github user ChrisChinchilla commented on the issue: https://github.com/apache/flink/pull/5045 @greghogan @zentol I will get these updates to you asap. But I will have to strongly disagree on the 'you' and passive voice points. I've been working as a technical writer for some time, and this a point we're all fairly strong on pushing as in the long run it does make documents much clearer, less ambiguous and easier to read. Granted a LOT of the Flink documentation doesn't follow this style right now, and that's something I personally would love to change over time, as I really think it would help people understand better. Ideally I would love to create a more comprehensive style guide and explanation of this (and I have boilerplates I've used with other projects) and change this voicing over time. Thoughts? ---
[jira] [Created] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs
Gary Yao created FLINK-8150: --- Summary: WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs Key: FLINK-8150 URL: https://issues.apache.org/jira/browse/FLINK-8150 Project: Flink Issue Type: Bug Components: Distributed Coordination, REST Affects Versions: 1.5.0 Reporter: Gary Yao Priority: Blocker Fix For: 1.5.0 TaskManager IDs exposed by {{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} cannot be used as input to query TaskManager metrics with method {{MetricStore#getTaskManagerMetricStore(String)}}. *Reason* {{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} s where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} s. While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} returns the Taskmanager resource IDs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8149) Replace usages of deprecated SerializationSchema
Hai Zhou UTC+8 created FLINK-8149: - Summary: Replace usages of deprecated SerializationSchema Key: FLINK-8149 URL: https://issues.apache.org/jira/browse/FLINK-8149 Project: Flink Issue Type: Improvement Components: Kinesis Connector Affects Versions: 1.4.0 Reporter: Hai Zhou UTC+8 Assignee: Hai Zhou UTC+8 Fix For: 1.5.0 The deprecated {{SerializationSchema}} in {{flink-streaming-java}}, has been moved to {{flink-core}}. But, the deprecate {{SerializationSchema}} is still used in {{flink-connector-kinesis}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265444#comment-16265444 ] ASF GitHub Bot commented on FLINK-7499: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Rat check did not pass. Could you please fix this @NicoK. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Rat check did not pass. Could you please fix this @NicoK. ---
[jira] [Commented] (FLINK-8142) Cleanup reference to deprecated constants in ConfigConstants
[ https://issues.apache.org/jira/browse/FLINK-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265443#comment-16265443 ] ASF GitHub Bot commented on FLINK-8142: --- GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/5067 [FLINK-8142][config] Cleanup reference to deprecated constants in ConfigConstants ## What is the purpose of the change ConfigConstants contains several deprecated String constants that are used by other Flink modules. Those should be cleaned up. ## Brief change log - *Replace usages of deprecated `TASK_MANAGER_MEMORY_SIZE_KEY` , `EXECUTION_RETRIES_KEY`, `EXECUTION_RETRY_DELAY_KEY`, `TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY`, `JOB_MANAGER_IPC_ADDRESS_KEY`* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-8142 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5067.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5067 commit 0cf6eb6a0bbee829adedc9ddce230c4aa70abe94 Author: yew1ebDate: 2017-11-23T15:52:38Z [FLINK-8142][minor] Cleanup reference to deprecated constants in ConfigConstants > Cleanup reference to deprecated constants in ConfigConstants > > > Key: FLINK-8142 > URL: https://issues.apache.org/jira/browse/FLINK-8142 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Minor > Fix For: 1.5.0 > > > ConfigConstants contains several deprecated String constants that are used by > other Flink modules. Those should be cleaned up. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5067: [FLINK-8142][config] Cleanup reference to deprecat...
GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/5067 [FLINK-8142][config] Cleanup reference to deprecated constants in ConfigConstants ## What is the purpose of the change ConfigConstants contains several deprecated String constants that are used by other Flink modules. Those should be cleaned up. ## Brief change log - *Replace usages of deprecated `TASK_MANAGER_MEMORY_SIZE_KEY` , `EXECUTION_RETRIES_KEY`, `EXECUTION_RETRY_DELAY_KEY`, `TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY`, `JOB_MANAGER_IPC_ADDRESS_KEY`* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK-8142 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5067.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5067 commit 0cf6eb6a0bbee829adedc9ddce230c4aa70abe94 Author: yew1ebDate: 2017-11-23T15:52:38Z [FLINK-8142][minor] Cleanup reference to deprecated constants in ConfigConstants ---
[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-7694: Description: Port {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to new handler that works with {{RestServerEndpoint}}. Add new handler to {{DispatcherRestEndpoint}}. (was: Port {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}}) > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Labels: flip6 > Fix For: 1.5.0 > > > Port > {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to > new handler that works with {{RestServerEndpoint}}. Add new handler to > {{DispatcherRestEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Hi @aljoscha, I wonder if you could help review this PR when you are convenient. Thanks, Xingcan ---
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265363#comment-16265363 ] ASF GitHub Bot commented on FLINK-8090: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5032 Hi @aljoscha, I wonder if you could help review this PR when you are convenient. Thanks, Xingcan > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint
[ https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265345#comment-16265345 ] Aljoscha Krettek commented on FLINK-7883: - The problem is that this is harder than it looks. In a Flink pipeline data can not only originate at sources but there can be other things in the pipeline that produce events "out of thin air". From the top of my head I can think of: - Sources: this is obvious, sources emit events - Reaction to watermark timers, these can emit events. Not as easy to control because watermarks can not only originate at sources but also at any operator in the pipeline - Reaction to processing-time timers, these can also emit events at any time - "Special" operators that emit data from a separate thread. Currently this would be the Async I/O operator and the {{ContinuousFileReaderOperator}} which forms the "file source" together with {{ContinuousFileMonitoringFunction}}. If we want to make sure that we don't emit any unwanted data we have to _quiesce_ the whole pipeline first. This can be done via special messages (like watermarks) that are injected at the sources and traverse the topology based on a message from the {{JobManager}}. All operators would have to report that they are quiesced before we do the savepoint. In case the savepoint fails for some reason we need to _un-quiesce_ the pipeline again. The above is the hard part. Making sure that we don't emit data from a "source" is as simple as ensuring that {{SourceContext.collect()}} doesn't forward the data that the source wants to emit. We definitely have to solve this problem but I don't think that we can do this for the 1.5 release because the community decided to do a very short release cycle after 1.4.0 because a bunch of important features are almost ready to be released? > Stop fetching source before a cancel with savepoint > --- > > Key: FLINK-7883 > URL: https://issues.apache.org/jira/browse/FLINK-7883 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Kafka Connector, State Backends, > Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Antoine Philippot > > For a cancel with savepoint command, the JobManager trigger the cancel call > once the savepoint is finished, but during the savepoint execution, kafka > source continue to poll new messages which will not be part of the savepoint > and will be replayed on the next application start. > A solution could be to stop fetching the source stream task before triggering > the savepoint. > I suggest to add an interface {{StoppableFetchingSourceFunction}} with a > method {{stopFetching}} that existant SourceFunction implementations could > implement. > We can add a {{stopFetchingSource}} property in > {{CheckpointOptions}} class to pass the desired behaviour from > {{JobManager.handleMessage(CancelJobWithSavepoint)}} to > {{SourceStreamTask.triggerCheckpoint}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5066: [FLINK-8148][yarn/s3] fix test instability in YarnFileSta...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5066 Makes sense, +1 ---
[jira] [Commented] (FLINK-8148) Test instability in YarnFileStageTest
[ https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265344#comment-16265344 ] ASF GitHub Bot commented on FLINK-8148: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5066 Makes sense, +1 > Test instability in YarnFileStageTest > - > > Key: FLINK-8148 > URL: https://issues.apache.org/jira/browse/FLINK-8148 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Labels: test-stability > > {code} > Running org.apache.flink.yarn.YarnFileStageTestS3ITCase > Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec > <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase > testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) > Time elapsed: 8.515 sec <<< FAILURE! > java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at org.junit.Assert.assertFalse(Assert.java:74) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192) > {code} > from https://travis-ci.org/apache/flink/jobs/305861539 > {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the > test directory used is cleaned up by > {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the > directory (in a {{finally}} block). However, for S3, we may not always see > our own deletes. > Quoting from https://aws.amazon.com/s3/faqs/ here: > {quote}Q: What data consistency model does Amazon S3 employ? > Amazon S3 buckets in all Regions provide read-after-write consistency for > PUTS of new objects and eventual consistency for overwrite PUTS and > DELETES.{quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8100) Consider introducing log4j-extras
[ https://issues.apache.org/jira/browse/FLINK-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265322#comment-16265322 ] Aljoscha Krettek commented on FLINK-8100: - We don't use log4j directly but slf4j. Would this still work? > Consider introducing log4j-extras > -- > > Key: FLINK-8100 > URL: https://issues.apache.org/jira/browse/FLINK-8100 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > log4j-extras allows log rotation as well as compression. > https://logging.apache.org/log4j/extras/download.html > We should consider using log4j-extras. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8103) Flink 1.4 not writing to standard out log file
[ https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265321#comment-16265321 ] Aljoscha Krettek commented on FLINK-8103: - I just ran an example like this on the (just-released) RC1 for Flink 1.4.0. The output was in fact in the .out file of the TaskManager. Could you try it with that as well? RC1: http://people.apache.org/~aljoscha/flink-1.4.0-rc1/ > Flink 1.4 not writing to standard out log file > -- > > Key: FLINK-8103 > URL: https://issues.apache.org/jira/browse/FLINK-8103 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: macOS 10.13 (High Sierra) >Reporter: Ryan Brideau > > I built the latest snapshot of 1.4 yesterday and tried testing it with a > simple word count example, where StreamUtil is just a helper that checks > input parameters: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object Words { > def main(args: Array[String]) { > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val dataStream = StreamUtil.getDataStream(env, params) > val wordDataStream = dataStream > .flatMap{ _.split(" ") } > wordDataStream.println > // execute program > env.execute("Words Scala") > } > } > {code} > This runs without an issue on the latest stable version of 1.3 and writes its > results to the _out_ file, which I can tail to see the results. This doesn't > happen in 1.4, however. I can modify it to write out to a file, however: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.core.fs.FileSystem.WriteMode > import org.apache.flink.streaming.api.scala._ > object Words { > def main(args: Array[String]) { > // set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val dataStream = StreamUtil.getDataStream(env, params) > val wordDataStream = dataStream > .flatMap{ _.split(" ") } > wordDataStream > .writeAsText("file:///somepath/output", WriteMode.OVERWRITE) > .setParallelism(1) > // execute program > env.execute("Words Scala") > } > } > {code} > Any clues as to what might be causing this? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8124) EventTimeTrigger (and other triggers) could have less specific generic types
[ https://issues.apache.org/jira/browse/FLINK-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265309#comment-16265309 ] Aljoscha Krettek commented on FLINK-8124: - I think that should be feasible, yes. > EventTimeTrigger (and other triggers) could have less specific generic types > > > Key: FLINK-8124 > URL: https://issues.apache.org/jira/browse/FLINK-8124 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Cristian >Priority: Minor > > When implementing custom WindowAssigners, it is possible to need different > implementations of the {{Window}} class (other than {{TimeWindow}}). In such > cases, it is not possible to use the existing triggers (e.g. > {{EventTimeTrigger}}) because it extends from {{Trigger
[jira] [Closed] (FLINK-6294) BucketingSink throws NPE while cancelling job
[ https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6294. --- Resolution: Fixed Fixed on release-1.3 in 5aea4917662d80899d3adff0378b97ae58aa2afc Fixed on release-1.4 in f636276dee991160a640442cbfcfdd58bfa57806 Fixed on master in d7911c5a8a6896261c55b61ea4633e706270baa1 > BucketingSink throws NPE while cancelling job > - > > Key: FLINK-6294 > URL: https://issues.apache.org/jira/browse/FLINK-6294 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Andrey > Fix For: 1.3.3, 1.4.1 > > > Steps to reproduce: > * configure BucketingSink and run job > * cancel job from UI before processing any messages > * in logs: > {code} > 2017-04-11 10:14:54,681 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Source: Custom > Source (1/2) [Source: Custom Source (1/2)] > 2017-04-11 10:14:54,881 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Un-registering task and sending final execution state CANCELED to JobManager > for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) > [flink-akka.actor.default-dispatcher-4] > 2017-04-11 10:14:56,584 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)] > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6294) BucketingSink throws NPE while cancelling job
[ https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6294: Fix Version/s: 1.4.1 1.3.3 > BucketingSink throws NPE while cancelling job > - > > Key: FLINK-6294 > URL: https://issues.apache.org/jira/browse/FLINK-6294 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.0 >Reporter: Andrey > Fix For: 1.3.3, 1.4.1 > > > Steps to reproduce: > * configure BucketingSink and run job > * cancel job from UI before processing any messages > * in logs: > {code} > 2017-04-11 10:14:54,681 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Source: Custom > Source (1/2) [Source: Custom Source (1/2)] > 2017-04-11 10:14:54,881 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > Un-registering task and sending final execution state CANCELED to JobManager > for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) > [flink-akka.actor.default-dispatcher-4] > 2017-04-11 10:14:56,584 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)] > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-7694: Component/s: Distributed Coordination > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Labels: flip6 > Fix For: 1.5.0 > > > Port > {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-7694: Labels: flip6 (was: ) > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Labels: flip6 > Fix For: 1.5.0 > > > Port > {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-7694: Description: Port {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Fix For: 1.5.0 > > > Port > {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7694) Port JobMetricsHandler to new REST handler
[ https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-7694: --- Assignee: Gary Yao (was: Bowen Li) > Port JobMetricsHandler to new REST handler > -- > > Key: FLINK-7694 > URL: https://issues.apache.org/jira/browse/FLINK-7694 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Bowen Li >Assignee: Gary Yao > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5066: [FLINK-8148][yarn/s3] fix test instability in Yarn...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5066 [FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase ## What is the purpose of the change `YarnFileStageTestS3ITCase.testRecursiveUploadForYarn` verifies that the test directory used is cleaned up by `YarnFileStageTest.testCopyFromLocalRecursive` which should clean up the directory (in a `finally` block). However, for S3, we may not always see our own deletes. Quoting from https://aws.amazon.com/s3/faqs/ here: > Q: What data consistency model does Amazon S3 employ? > Amazon S3 buckets in all Regions provide read-after-write consistency for PUTS of new objects and eventual consistency for overwrite PUTS and DELETES. ## Brief change log - Remove a check for a deleted directory since we may not see our own delete yet with S3. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8148 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5066.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5066 commit 29d7a8e085a5655c00a3163546e22a5d1331ea60 Author: Nico KruberDate: 2017-11-24T13:54:41Z [FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase Remove a check for a deleted directory since we may not see our own delete yet with S3. ---
[jira] [Commented] (FLINK-8148) Test instability in YarnFileStageTest
[ https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265301#comment-16265301 ] ASF GitHub Bot commented on FLINK-8148: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5066 [FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase ## What is the purpose of the change `YarnFileStageTestS3ITCase.testRecursiveUploadForYarn` verifies that the test directory used is cleaned up by `YarnFileStageTest.testCopyFromLocalRecursive` which should clean up the directory (in a `finally` block). However, for S3, we may not always see our own deletes. Quoting from https://aws.amazon.com/s3/faqs/ here: > Q: What data consistency model does Amazon S3 employ? > Amazon S3 buckets in all Regions provide read-after-write consistency for PUTS of new objects and eventual consistency for overwrite PUTS and DELETES. ## Brief change log - Remove a check for a deleted directory since we may not see our own delete yet with S3. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8148 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5066.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5066 commit 29d7a8e085a5655c00a3163546e22a5d1331ea60 Author: Nico KruberDate: 2017-11-24T13:54:41Z [FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase Remove a check for a deleted directory since we may not see our own delete yet with S3. > Test instability in YarnFileStageTest > - > > Key: FLINK-8148 > URL: https://issues.apache.org/jira/browse/FLINK-8148 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Labels: test-stability > > {code} > Running org.apache.flink.yarn.YarnFileStageTestS3ITCase > Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec > <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase > testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) > Time elapsed: 8.515 sec <<< FAILURE! > java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at org.junit.Assert.assertFalse(Assert.java:74) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192) > {code} > from https://travis-ci.org/apache/flink/jobs/305861539 > {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the > test directory used is cleaned up by > {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the > directory (in a {{finally}} block). However, for S3, we may not always see > our own deletes. > Quoting from https://aws.amazon.com/s3/faqs/ here: > {quote}Q: What data consistency model does Amazon S3 employ? > Amazon S3 buckets in all Regions provide read-after-write consistency for > PUTS of new objects and eventual consistency for overwrite PUTS and > DELETES.{quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8148) Test instability in YarnFileStageTest
[ https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8148: --- Labels: test-stability (was: ) > Test instability in YarnFileStageTest > - > > Key: FLINK-8148 > URL: https://issues.apache.org/jira/browse/FLINK-8148 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Labels: test-stability > > {code} > Running org.apache.flink.yarn.YarnFileStageTestS3ITCase > Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec > <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase > testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) > Time elapsed: 8.515 sec <<< FAILURE! > java.lang.AssertionError: null > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertFalse(Assert.java:64) > at org.junit.Assert.assertFalse(Assert.java:74) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171) > at > org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192) > {code} > from https://travis-ci.org/apache/flink/jobs/305861539 > {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the > test directory used is cleaned up by > {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the > directory (in a {{finally}} block). However, for S3, we may not always see > our own deletes. > Quoting from https://aws.amazon.com/s3/faqs/ here: > {quote}Q: What data consistency model does Amazon S3 employ? > Amazon S3 buckets in all Regions provide read-after-write consistency for > PUTS of new objects and eventual consistency for overwrite PUTS and > DELETES.{quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8148) Test instability in YarnFileStageTest
Nico Kruber created FLINK-8148: -- Summary: Test instability in YarnFileStageTest Key: FLINK-8148 URL: https://issues.apache.org/jira/browse/FLINK-8148 Project: Flink Issue Type: Bug Components: Tests, YARN Affects Versions: 1.4.0 Reporter: Nico Kruber Assignee: Nico Kruber Priority: Critical {code} Running org.apache.flink.yarn.YarnFileStageTestS3ITCase Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) Time elapsed: 8.515 sec <<< FAILURE! java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.junit.Assert.assertFalse(Assert.java:74) at org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171) at org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192) {code} from https://travis-ci.org/apache/flink/jobs/305861539 {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the test directory used is cleaned up by {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the directory (in a {{finally}} block). However, for S3, we may not always see our own deletes. Quoting from https://aws.amazon.com/s3/faqs/ here: {quote}Q: What data consistency model does Amazon S3 employ? Amazon S3 buckets in all Regions provide read-after-write consistency for PUTS of new objects and eventual consistency for overwrite PUTS and DELETES.{quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972315 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -729,6 +804,12 @@ public Void call() throws Exception { }; } + /** +* Submits all the callable tasks to the executor and waits the results. --- End diff -- `waits for the results` ---
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265295#comment-16265295 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152963546 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); -
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265293#comment-16265293 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152969860 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); -
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972034 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a result - verify(bufferPool, times(11)).requestBuffer(); + // The channel does not get enough floating buffer and register as buffer listener + verify(bufferPool,
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265296#comment-16265296 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152962580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } --- End diff -- if you add `verify(bufferPool, times(4)).requestBuffer();` here, the difference to the 13 below becomes a bit more clear (because the requests are absolute values while as differences would be clear from the start, i.e. that we request 9 additional buffers - unfortunately, Mockito does not provide this as far as I know - but that's not a big deal as soon as everything is clear) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265297#comment-16265297 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972414 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); + // Exhaust all the floating buffers + for (int i = 0; i < numFloatingBuffers; i++) { + Buffer buffer = bufferPool.requestBuffer(); + assertNotNull(buffer); + floatingBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : floatingBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private void submitTasksAndWaitResults(ExecutorService executor, Callable[] tasks) throws Exception { --- End diff -- maybe also rename to `submitTasksAndWaitForResults` > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152962580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } --- End diff -- if you add `verify(bufferPool, times(4)).requestBuffer();` here, the difference to the 13 below becomes a bit more clear (because the requests are absolute values while as differences would be clear from the start, i.e. that we request 9 additional buffers - unfortunately, Mockito does not provide this as far as I know - but that's not a big deal as soon as everything is clear) ---
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152969860 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13,
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152963546 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() throws Exception { // Prepare the exclusive and floating buffers to verify recycle logic later Buffer exclusiveBuffer = inputChannel.requestBuffer(); assertNotNull(exclusiveBuffer); - Buffer floatingBuffer1 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer1); - Buffer floatingBuffer2 = bufferPool.requestBuffer(); - assertNotNull(floatingBuffer2); + + final int numRecycleFloatingBuffers = 4; + final ArrayDeque floatingBufferQueue = new ArrayDeque<>(numRecycleFloatingBuffers); + for (int i = 0; i < numRecycleFloatingBuffers; i++) { + Buffer floatingBuffer = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer); + floatingBufferQueue.add(floatingBuffer); + } // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the floating buffers to maintain (backlog + initialCredit) available buffers - verify(bufferPool, times(11)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers. + // One exclusive buffer is taken before, so we should request 13 floating buffers. + verify(bufferPool, times(13)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 10 buffers available in the channel", + 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", + 10, inputChannel.getNumberOfRequiredBuffers()); // Increase the backlog to exceed the number of available floating buffers inputChannel.onSenderBacklog(10); // The channel does not get enough floating buffer and register as buffer listener - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 12 buffers required in the channel", 12, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertEquals("There should be 11 buffers available in the channel", + 11, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 12 buffers required in the channel", + 12, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 0 buffer available in local pool", + 0, bufferPool.getNumberOfAvailableMemorySegments()); // Continue increasing the backlog - inputChannel.onSenderBacklog(11); + inputChannel.onSenderBacklog(12); // The channel is already in the status of waiting for buffers and will not request any more - verify(bufferPool, times(13)).requestBuffer(); + verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); - assertEquals("There should be 11 buffers available in the channel", 11, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 13 buffers required in the channel", 13,
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265294#comment-16265294 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972034 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -306,51 +306,88 @@ public void testProducerFailedException() throws Exception { } /** -* Tests to verify that the input channel requests floating buffers from buffer pool -* in order to maintain backlog + initialCredit buffers available once receiving the -* sender's backlog, and registers as listener if no floating buffers available. +* Tests to verify that the input channel requests floating buffers from buffer pool for +* maintaining (backlog + initialCredit) available buffers once receiving the sender's backlog. +* +* Verifies the logic of recycling floating buffer back into the input channel and the logic +* of returning extra floating buffer into the buffer pool during recycling exclusive buffer. */ @Test - public void testRequestFloatingBufferOnSenderBacklog() throws Exception { + public void testRequestAndReturnFloatingBuffer() throws Exception { // Setup - final NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32, MemoryType.HEAP); + final NetworkBufferPool networkBufferPool = new NetworkBufferPool(14, 32, MemoryType.HEAP); + final int numExclusiveBuffers = 2; + final int numFloatingBuffers = 12; + final SingleInputGate inputGate = createSingleInputGate(); final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); try { - final int numFloatingBuffers = 10; final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers)); inputGate.setBufferPool(bufferPool); - - // Assign exclusive segments to the channel - final int numExclusiveBuffers = 2; - inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel); inputGate.assignExclusiveSegments(networkBufferPool, numExclusiveBuffers); - assertEquals("There should be " + numExclusiveBuffers + " buffers available in the channel", - numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + // Prepare the exclusive and floating buffers to verify recycle logic later + Buffer exclusiveBuffer = inputChannel.requestBuffer(); + assertNotNull(exclusiveBuffer); + Buffer floatingBuffer1 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer1); + Buffer floatingBuffer2 = bufferPool.requestBuffer(); + assertNotNull(floatingBuffer2); - // Receive the producer's backlog + // Receive the producer's backlog less than the number of available floating buffers inputChannel.onSenderBacklog(8); - // Request the number of floating buffers by the formula of backlog + initialCredit - availableBuffers - verify(bufferPool, times(8)).requestBuffer(); + // Request the floating buffers to maintain (backlog + initialCredit) available buffers + verify(bufferPool, times(11)).requestBuffer(); verify(bufferPool, times(0)).addBufferListener(inputChannel); - assertEquals("There should be 10 buffers available in the channel", - 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers available in the channel", 10, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 10 buffers required in the channel", 10, inputChannel.getNumberOfRequiredBuffers()); - inputChannel.onSenderBacklog(11); + // Increase the backlog to exceed the number of available floating buffers + inputChannel.onSenderBacklog(10); - // Need extra three floating buffers, but only two buffers available in buffer pool, register as listener as a
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265298#comment-16265298 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972315 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -729,6 +804,12 @@ public Void call() throws Exception { }; } + /** +* Submits all the callable tasks to the executor and waits the results. --- End diff -- `waits for the results` > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"
[ https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265290#comment-16265290 ] Aljoscha Krettek commented on FLINK-8093: - Do you mean thread safe or *not* thread safe? I think the static variable can be a problem because that would be used by multiple threads and the tasks from the two jobs share the same JVM instances. > flink job fail because of kafka producer create fail of > "javax.management.InstanceAlreadyExistsException" > - > > Key: FLINK-8093 > URL: https://issues.apache.org/jira/browse/FLINK-8093 > Project: Flink > Issue Type: Bug >Affects Versions: 1.3.2 > Environment: flink 1.3.2, kafka 0.9.1 >Reporter: dongtingting >Priority: Critical > > one taskmanager has multiple taskslot, one task fail because of create > kafkaProducer fail,the reason for create kafkaProducer fail is > “javax.management.InstanceAlreadyExistsException: > kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace > is : > 2017-11-04 19:41:23,281 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from > RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.KafkaException: Error registering mbean > kafka.producer:type=producer-metrics,client-id=producer-3 > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159) > at > org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77) > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255) > at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.(RecordAccumulator.java:111) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:261) > ... 9 more > Caused by: javax.management.InstanceAlreadyExistsException: > kafka.producer:type=producer-metrics,client-id=producer-3 > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157) > ... 16 more > I doubt that task in different taskslot of one taskmanager use different > classloader, and taskid may be the same in one process。 So this lead to > create kafkaProducer fail in one taskManager。 > Does anybody encountered the same problem? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152972414 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel( initialAndMaxRequestBackoff._2(), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); } + + private Callable recycleExclusiveBufferTask(RemoteInputChannel inputChannel, int numExclusiveSegments) { + final List exclusiveBuffers = new ArrayList<>(numExclusiveSegments); + // Exhaust all the exclusive buffers + for (int i = 0; i < numExclusiveSegments; i++) { + Buffer buffer = inputChannel.requestBuffer(); + assertNotNull(buffer); + exclusiveBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : exclusiveBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private Callable recycleFloatingBufferTask(BufferPool bufferPool, int numFloatingBuffers) throws Exception { + final List floatingBuffers = new ArrayList<>(numFloatingBuffers); + // Exhaust all the floating buffers + for (int i = 0; i < numFloatingBuffers; i++) { + Buffer buffer = bufferPool.requestBuffer(); + assertNotNull(buffer); + floatingBuffers.add(buffer); + } + + return new Callable() { + @Override + public Void call() throws Exception { + for (Buffer buffer : floatingBuffers) { + buffer.recycle(); + } + + return null; + } + }; + } + + private void submitTasksAndWaitResults(ExecutorService executor, Callable[] tasks) throws Exception { --- End diff -- maybe also rename to `submitTasksAndWaitForResults` ---
[jira] [Updated] (FLINK-8147) Support and to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-8147: -- Issue Type: New Feature (was: Bug) > Support and to CEP's pattern API > > > Key: FLINK-8147 > URL: https://issues.apache.org/jira/browse/FLINK-8147 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dian Fu > > Adding API such as {{and}} in CEP's pattern API will let us define patterns > like {{(A -> B) and (C -> D)}}. > Its usage looks like > {noformat} > Pattern left = Pattern.begin("A").followedBy("B"); > Pattern right = Pattern.begin("C").followedBy("D"); > Pattern pattern = left.and(right) > {noformat} > If this makes sense, we should also support APIs such as {{or}} and {{not}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8147) Support and to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-8147: -- Priority: Minor (was: Major) > Support and to CEP's pattern API > > > Key: FLINK-8147 > URL: https://issues.apache.org/jira/browse/FLINK-8147 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dian Fu >Priority: Minor > > Adding API such as {{and}} in CEP's pattern API will let us define patterns > like {{(A -> B) and (C -> D)}}. > Its usage looks like > {noformat} > Pattern left = Pattern.begin("A").followedBy("B"); > Pattern right = Pattern.begin("C").followedBy("D"); > Pattern pattern = left.and(right) > {noformat} > If this makes sense, we should also support APIs such as {{or}} and {{not}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6909) Flink should support Lombok POJO
[ https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265280#comment-16265280 ] Timo Walther commented on FLINK-6909: - I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you check the logs for TypeExtractor entries? Is the class a member class of another class. Do you have the lombok IDE plugin installed and added the Maven dependency correctly? > Flink should support Lombok POJO > > > Key: FLINK-6909 > URL: https://issues.apache.org/jira/browse/FLINK-6909 > Project: Flink > Issue Type: Wish > Components: Type Serialization System >Reporter: Md Kamaruzzaman >Assignee: Timo Walther >Priority: Minor > > Project lombok helps greatly to reduce boilerplate Java Code. > It seems that Flink does not accept a lombok POJO as a valid pojo. > e.g. Here is a POJO defined with lombok: > @Getter > @Setter > @NoArgsConstructor > public class SimplePojo > Using this Pojo class to read from CSV file throws this exception: > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > It would be great if flink supports lombok POJO. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6909) Flink should support Lombok POJO
[ https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265280#comment-16265280 ] Timo Walther edited comment on FLINK-6909 at 11/24/17 1:37 PM: --- I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you check the logs for TypeExtractor entries? Is the class a member class of another class? Do you have the lombok IDE plugin installed and added the Maven dependency correctly? was (Author: twalthr): I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you check the logs for TypeExtractor entries? Is the class a member class of another class. Do you have the lombok IDE plugin installed and added the Maven dependency correctly? > Flink should support Lombok POJO > > > Key: FLINK-6909 > URL: https://issues.apache.org/jira/browse/FLINK-6909 > Project: Flink > Issue Type: Wish > Components: Type Serialization System >Reporter: Md Kamaruzzaman >Assignee: Timo Walther >Priority: Minor > > Project lombok helps greatly to reduce boilerplate Java Code. > It seems that Flink does not accept a lombok POJO as a valid pojo. > e.g. Here is a POJO defined with lombok: > @Getter > @Setter > @NoArgsConstructor > public class SimplePojo > Using this Pojo class to read from CSV file throws this exception: > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > It would be great if flink supports lombok POJO. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner
[ https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Gevay resolved FLINK-8117. Resolution: Implemented > Eliminate modulo operation from RoundRobinChannelSelector and > RebalancePartitioner > -- > > Key: FLINK-8117 > URL: https://issues.apache.org/jira/browse/FLINK-8117 > Project: Flink > Issue Type: Improvement > Components: Local Runtime, Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay >Priority: Minor > Labels: performance > Fix For: 1.5.0 > > > {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and > {{RescalePartitioner}} use a modulo operation to wrap around when the current > channel counter reaches the number of channels. Using an {{if}} would have > better performance. > A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but > the {{if}} will be only 1-2 cycles on average, since the branch predictor can > most of the time predict the condition to be false. > \[1\] http://www.agner.org/optimize/instruction_tables.pdf -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265251#comment-16265251 ] ASF GitHub Bot commented on FLINK-8144: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5063 I see. The bad performance is related to way that you generate the watermarks. Generating so many watermarks is very inefficient because you processing many more watermarks than actual records (watermarks are always broadcasted). I'm not convinced that this is a scenario that we should optimize for. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5063 I see. The bad performance is related to way that you generate the watermarks. Generating so many watermarks is very inefficient because you processing many more watermarks than actual records (watermarks are always broadcasted). I'm not convinced that this is a scenario that we should optimize for. ---
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265244#comment-16265244 ] ASF GitHub Bot commented on FLINK-8139: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5065 Hi @Aegeaner, thanks for the pull request. I had a brief look at it and noticed a few things: 1. Please fill out the PR template (see #5063 as an example) 2. Please add unit tests that validate that the checks you added are working correctly. 3. Please update the `equals()` and `hashCode()` methods of `Row` as mentioned in the JIRA issue. Thank you, Fabian > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5065: [FLINK-8139][table] Check for proper equals() and hashCod...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5065 Hi @Aegeaner, thanks for the pull request. I had a brief look at it and noticed a few things: 1. Please fill out the PR template (see #5063 as an example) 2. Please add unit tests that validate that the checks you added are working correctly. 3. Please update the `equals()` and `hashCode()` methods of `Row` as mentioned in the JIRA issue. Thank you, Fabian ---
[jira] [Commented] (FLINK-8147) Support and to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265243#comment-16265243 ] Dian Fu commented on FLINK-8147: Any comments is welcome. Do you think it's useful? What do you think the API should look like? > Support and to CEP's pattern API > > > Key: FLINK-8147 > URL: https://issues.apache.org/jira/browse/FLINK-8147 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu > > Adding API such as {{and}} in CEP's pattern API will let us define patterns > like {{(A -> B) and (C -> D)}}. > Its usage looks like > {noformat} > Pattern left = Pattern.begin("A").followedBy("B"); > Pattern right = Pattern.begin("C").followedBy("D"); > Pattern pattern = left.and(right) > {noformat} > If this makes sense, we should also support APIs such as {{or}} and {{not}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265239#comment-16265239 ] ASF GitHub Bot commented on FLINK-7406: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152961281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- Sorry about the forth-and-back here, but thinking about the bug that you fixed with the latest commit, it would be dangerous if we ever called `maintainTargetSize()` without adding an exclusive buffer. What do you think about my second suggestion instead, i.e. having a ``` /** * Adds an exclusive buffer (back) into the queue and recycles one floating buffer if the * number of available buffers in queue is more than the required amount. * * @param buffer * the exclusive buffer to add * @param numRequiredBuffers * the number of required buffers * * @return how many buffers were added to the queue */ int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { exclusiveBuffers.add(buffer); if (getAvailableBufferSize() > numRequiredBuffers) { Buffer floatingBuffer = floatingBuffers.poll(); floatingBuffer.recycle(); return 0; } else { return 1; } } ``` (please note the changed return type which I think is more obvious than a `boolean`) > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r152961281 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java --- @@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) { exclusiveBuffers.add(buffer); } - Buffer takeExclusiveBuffer() { - return exclusiveBuffers.poll(); - } - void addFloatingBuffer(Buffer buffer) { floatingBuffers.add(buffer); } - Buffer takeFloatingBuffer() { - return floatingBuffers.poll(); + /** +* Add the exclusive buffer into the queue, and recycle one floating buffer if the +* number of available buffers in queue is more than required amount. +* +* @param buffer The exclusive buffer of this channel. +* @return Whether to recycle one floating buffer. +*/ + boolean maintainTargetSize(Buffer buffer) { --- End diff -- Sorry about the forth-and-back here, but thinking about the bug that you fixed with the latest commit, it would be dangerous if we ever called `maintainTargetSize()` without adding an exclusive buffer. What do you think about my second suggestion instead, i.e. having a ``` /** * Adds an exclusive buffer (back) into the queue and recycles one floating buffer if the * number of available buffers in queue is more than the required amount. * * @param buffer * the exclusive buffer to add * @param numRequiredBuffers * the number of required buffers * * @return how many buffers were added to the queue */ int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) { exclusiveBuffers.add(buffer); if (getAvailableBufferSize() > numRequiredBuffers) { Buffer floatingBuffer = floatingBuffers.poll(); floatingBuffer.recycle(); return 0; } else { return 1; } } ``` (please note the changed return type which I think is more obvious than a `boolean`) ---
[jira] [Created] (FLINK-8147) Support and to CEP's pattern API
Dian Fu created FLINK-8147: -- Summary: Support and to CEP's pattern API Key: FLINK-8147 URL: https://issues.apache.org/jira/browse/FLINK-8147 Project: Flink Issue Type: Bug Components: CEP Reporter: Dian Fu Adding API such as {{and}} in CEP's pattern API will let us define patterns like {{(A -> B) and (C -> D)}}. Its usage looks like {noformat} Pattern left = Pattern.begin("A").followedBy("B"); Pattern right = Pattern.begin("C").followedBy("D"); Pattern pattern = left.and(right) {noformat} If this makes sense, we should also support APIs such as {{or}} and {{not}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265231#comment-16265231 ] ASF GitHub Bot commented on FLINK-8125: --- Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5059 > Support limiting the number of open FileSystem connections > -- > > Key: FLINK-8125 > URL: https://issues.apache.org/jira/browse/FLINK-8125 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > We need a way to limit the number of streams that Flink FileSystems > concurrently open. > For example, for very small HDFS clusters with few RPC handlers, a large > Flink job trying to build up many connections during a checkpoint causes > failures due to rejected connections. > I propose to add a file system that can wrap another existing file system The > file system may track the progress of streams and close streams that have > been inactive for too long, to avoid locked streams of taking up the complete > pool. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5059 ---
[jira] [Resolved] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8125. - Resolution: Fixed Fixed in - 1.5.0 via d7c2c417213502130b1aeab1868313df178555cc - 1.4.1 via a11e2cf0b1f37d3ef22e1978e89928fa374960db > Support limiting the number of open FileSystem connections > -- > > Key: FLINK-8125 > URL: https://issues.apache.org/jira/browse/FLINK-8125 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > We need a way to limit the number of streams that Flink FileSystems > concurrently open. > For example, for very small HDFS clusters with few RPC handlers, a large > Flink job trying to build up many connections during a checkpoint causes > failures due to rejected connections. > I propose to add a file system that can wrap another existing file system The > file system may track the progress of streams and close streams that have > been inactive for too long, to avoid locked streams of taking up the complete > pool. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8125) Support limiting the number of open FileSystem connections
[ https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8125. --- > Support limiting the number of open FileSystem connections > -- > > Key: FLINK-8125 > URL: https://issues.apache.org/jira/browse/FLINK-8125 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > We need a way to limit the number of streams that Flink FileSystems > concurrently open. > For example, for very small HDFS clusters with few RPC handlers, a large > Flink job trying to build up many connections during a checkpoint causes > failures due to rejected connections. > I propose to add a file system that can wrap another existing file system The > file system may track the progress of streams and close streams that have > been inactive for too long, to avoid locked streams of taking up the complete > pool. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8146) Potential resource leak in PythonPlanBinder#unzipPythonLibrary
Ted Yu created FLINK-8146: - Summary: Potential resource leak in PythonPlanBinder#unzipPythonLibrary Key: FLINK-8146 URL: https://issues.apache.org/jira/browse/FLINK-8146 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} while (entry != null) { ... } zis.closeEntry(); {code} Looking at the catch block inside the loop, it seems the intention is to close zis upon getting exception. zis.close() should be called outside the loop. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265213#comment-16265213 ] ASF GitHub Bot commented on FLINK-8144: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152955709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- @fhueske Thanks a lot for your comments. Your concern makes sense to me. I think the current implementation is ok under periodic watermark. But I'm not sure if it's optimal under punctuated watermark. We will perform some performance test for unbounded over under punctuated watermark and share the results. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152955709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- @fhueske Thanks a lot for your comments. Your concern makes sense to me. I think the current implementation is ok under periodic watermark. But I'm not sure if it's optimal under punctuated watermark. We will perform some performance test for unbounded over under punctuated watermark and share the results. ---
[GitHub] flink pull request #5055: [FLINK-7718] [flip6] Add JobVertexMetricsHandler t...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5055#discussion_r152951899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.Metric; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * @param Type of the concrete MessageParameters + */ +public abstract class AbstractMetricsHandler extends + AbstractRestHandler{ + + private final MetricFetcher metricFetcher; + + public AbstractMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map headers, + MessageHeaders messageHeaders, + MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders); + this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null"); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull DispatcherGateway gateway) throws RestHandlerException { + metricFetcher.update(); + + final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore( + request, + metricFetcher.getMetricStore()); + + if (componentMetricStore == null || componentMetricStore.metrics == null) { + return
[jira] [Commented] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265202#comment-16265202 ] ASF GitHub Bot commented on FLINK-7718: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5055#discussion_r152951899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.Metric; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. + * {@code /metrics?get=X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + * + * @param Type of the concrete MessageParameters + */ +public abstract class AbstractMetricsHandler extends + AbstractRestHandler{ + + private final MetricFetcher metricFetcher; + + public AbstractMetricsHandler( + CompletableFuture localRestAddress, + GatewayRetriever leaderRetriever, + Time timeout, + Map headers, + MessageHeaders messageHeaders, + MetricFetcher metricFetcher) { + super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders); + this.metricFetcher = requireNonNull(metricFetcher, "metricFetcher must not be null"); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull DispatcherGateway gateway) throws RestHandlerException { + metricFetcher.update(); + + final MetricStore.ComponentMetricStore componentMetricStore = getComponentMetricStore( +
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265184#comment-16265184 ] ASF GitHub Bot commented on FLINK-8139: --- GitHub user Aegeaner opened a pull request: https://github.com/apache/flink/pull/5065 [FLINK-8139][table] Check for proper equals() and hashCode() when reg… https://issues.apache.org/jira/browse/FLINK-8139 Check for proper equals() and hashCode() when registering a table. ## Contribution Checklist ## Verifying this change flink-table unit tests passed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aegeaner/flink FLINK-8139 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5065.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5065 commit 41e79898653c656ebca3b4d96bab6f2b4664488d Author: AegeanerDate: 2017-11-24T08:11:28Z [FLINK-8139][table] Check for proper equals() and hashCode() when registering a table > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
GitHub user Aegeaner opened a pull request: https://github.com/apache/flink/pull/5065 [FLINK-8139][table] Check for proper equals() and hashCode() when reg⦠https://issues.apache.org/jira/browse/FLINK-8139 Check for proper equals() and hashCode() when registering a table. ## Contribution Checklist ## Verifying this change flink-table unit tests passed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aegeaner/flink FLINK-8139 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5065.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5065 commit 41e79898653c656ebca3b4d96bab6f2b4664488d Author: AegeanerDate: 2017-11-24T08:11:28Z [FLINK-8139][table] Check for proper equals() and hashCode() when registering a table ---
[jira] [Commented] (FLINK-8145) IOManagerAsync not properly shut down in various tests
[ https://issues.apache.org/jira/browse/FLINK-8145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265165#comment-16265165 ] ASF GitHub Bot commented on FLINK-8145: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5064 [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down ## What is the purpose of the change Fix several unit tests using `IOManagerAsync` but not shutting it down afterwards (not even in the failure-free case). The only thing cleaning them up seems to be a shutdown handler attached to the JVM. Since each IOManagerAsync is spawning a number of reader and writer threads, this may put a significant burden on the tests. ## Brief change log - add shutdown handlers via `@AfterClass` to `AsynchronousBufferFileWriterTest`, `BufferFileWriterFileSegmentReaderTest`, `BufferFileWriterReaderTest`, `HashTablePerformanceComparison`, `FixedLengthRecordSorterTest` - let `HashTableITCase`, `AbstractSortMergeOuterJoinIteratorITCase` test methods use the static `ioManager` instead of creating new instances in some test methods - clean up manually in `HashTableTest`, `MassiveStringSorting`, `MassiveStringValueSorting` (similar cleanup for the `MemoryManager` there) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8145 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5064.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5064 commit 7a0723b90fcefb8bf9c6e448e24db43758d0a1e2 Author: Nico KruberDate: 2017-11-24T10:31:48Z [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down > IOManagerAsync not properly shut down in various tests > -- > > Key: FLINK-8145 > URL: https://issues.apache.org/jira/browse/FLINK-8145 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > In various tests, e.g. {{AsynchronousBufferFileWriterTest}}, > {{BufferFileWriterReaderTest}}, or {{HashTableITCase}}, {{IOManagerAsync}} > instances are used which are not shut down at all. The only thing cleaning > them up seems to be a shutdown handler attached to the JVM. Since each > {{IOManagerAsync}} is spawning a number of reader and writer threads, this > may put a significant burden on the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5064: [FLINK-8145][tests] fix various IOManagerAsync ins...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5064 [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down ## What is the purpose of the change Fix several unit tests using `IOManagerAsync` but not shutting it down afterwards (not even in the failure-free case). The only thing cleaning them up seems to be a shutdown handler attached to the JVM. Since each IOManagerAsync is spawning a number of reader and writer threads, this may put a significant burden on the tests. ## Brief change log - add shutdown handlers via `@AfterClass` to `AsynchronousBufferFileWriterTest`, `BufferFileWriterFileSegmentReaderTest`, `BufferFileWriterReaderTest`, `HashTablePerformanceComparison`, `FixedLengthRecordSorterTest` - let `HashTableITCase`, `AbstractSortMergeOuterJoinIteratorITCase` test methods use the static `ioManager` instead of creating new instances in some test methods - clean up manually in `HashTableTest`, `MassiveStringSorting`, `MassiveStringValueSorting` (similar cleanup for the `MemoryManager` there) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8145 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5064.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5064 commit 7a0723b90fcefb8bf9c6e448e24db43758d0a1e2 Author: Nico KruberDate: 2017-11-24T10:31:48Z [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down ---
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265155#comment-16265155 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152944166 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- I don't have (too) hard feelings about it, so there it is - along with some more minor fixes in the tests and one additional test for `AsynchronousBufferFileWriter` itself (the change there was not covered yet) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152944166 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- I don't have (too) hard feelings about it, so there it is - along with some more minor fixes in the tests and one additional test for `AsynchronousBufferFileWriter` itself (the change there was not covered yet) ---
[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()
[ https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265143#comment-16265143 ] ASF GitHub Bot commented on FLINK-5465: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5058 I was thinking about where to put those options and after all it was a choice between fragmentation and inconsistency. I considered `TaskManagerOptions` but it sounds like a wrong place, if you look what the other options in this class are about and it also feels like saying `Task` == `StreamTask` because timers are something that currently belongs to `streaming` imo. There are classes for configurations all over the place, so if you think this is a problem, maybe we should fix it in general and not on a per-case basis? I agree that we can change the key string. > RocksDB fails with segfault while calling AbstractRocksDBState.clear() > -- > > Key: FLINK-5465 > URL: https://issues.apache.org/jira/browse/FLINK-5465 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter > Fix For: 1.4.0, 1.5.0 > > Attachments: hs-err-pid26662.log > > > I'm using Flink 699f4b0. > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576 > # > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build > 1.7.0_67-b01) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # C [librocksdbjni-linux64.so+0x1aeb78] > rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8 > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log > Compiled method (nm) 1869778 903 n org.rocksdb.RocksDB::remove > (native) > total in heap [0x7f91b40b9dd0,0x7f91b40ba150] = 896 > relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88 > main code [0x7f91b40b9f60,0x7f91b40ba150] = 496 > # > # If you would like to submit a bug report, please visit: > # http://bugreport.sun.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads t...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5058 I was thinking about where to put those options and after all it was a choice between fragmentation and inconsistency. I considered `TaskManagerOptions` but it sounds like a wrong place, if you look what the other options in this class are about and it also feels like saying `Task` == `StreamTask` because timers are something that currently belongs to `streaming` imo. There are classes for configurations all over the place, so if you think this is a problem, maybe we should fix it in general and not on a per-case basis? I agree that we can change the key string. ---
[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()
[ https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265138#comment-16265138 ] ASF GitHub Bot commented on FLINK-5465: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5058 I would like to make a few comments for followup: - I think `TimerServiceOptions` should not be an own class, we are getting a crazy fragmentation of options into classes (that have sometimes, like here, only one option defined). Let's merge these into the `TaskManagerOptions`. - The config key does not reflect the scheme in which all other config keys are defined. It reads like a name where the dot '.' is a work separator. The scheme in which all other config keys are defined is hierarchical, like a path in a nested config group/object structure. Think that the configuration is one huge JSON object, and the key is the dot path to the entry. Hence a key like `taskmanager.timers.shutdown-timeout` (or `taskmanager.timers.shutdown.timeout`, if we view `shutdown` as a full config group/object) would be in line with the resulting style. > RocksDB fails with segfault while calling AbstractRocksDBState.clear() > -- > > Key: FLINK-5465 > URL: https://issues.apache.org/jira/browse/FLINK-5465 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Stefan Richter > Fix For: 1.4.0, 1.5.0 > > Attachments: hs-err-pid26662.log > > > I'm using Flink 699f4b0. > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576 > # > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build > 1.7.0_67-b01) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # C [librocksdbjni-linux64.so+0x1aeb78] > rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8 > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log > Compiled method (nm) 1869778 903 n org.rocksdb.RocksDB::remove > (native) > total in heap [0x7f91b40b9dd0,0x7f91b40ba150] = 896 > relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88 > main code [0x7f91b40b9f60,0x7f91b40ba150] = 496 > # > # If you would like to submit a bug report, please visit: > # http://bugreport.sun.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads t...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5058 I would like to make a few comments for followup: - I think `TimerServiceOptions` should not be an own class, we are getting a crazy fragmentation of options into classes (that have sometimes, like here, only one option defined). Let's merge these into the `TaskManagerOptions`. - The config key does not reflect the scheme in which all other config keys are defined. It reads like a name where the dot '.' is a work separator. The scheme in which all other config keys are defined is hierarchical, like a path in a nested config group/object structure. Think that the configuration is one huge JSON object, and the key is the dot path to the entry. Hence a key like `taskmanager.timers.shutdown-timeout` (or `taskmanager.timers.shutdown.timeout`, if we view `shutdown` as a full config group/object) would be in line with the resulting style. ---
[jira] [Issue Comment Deleted] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-8144: --- Comment: was deleted (was: {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend after optimizing the timer logic. I think the optimization should also apply to {{RowTimeUnboundedOver}}. BTW: the watermark we use in the CEP use case is {{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every input element.) > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend after optimizing the timer logic. I think the optimization should also apply to {{RowTimeUnboundedOver}}. BTW: the watermark we use in the CEP use case is {{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every input element. ---
[jira] [Comment Edited] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265130#comment-16265130 ] Dian Fu edited comment on FLINK-8144 at 11/24/17 10:19 AM: --- {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend after optimizing the timer logic. I think the optimization should also apply to {{RowTimeUnboundedOver}}. BTW: the watermark we use in the CEP use case is {{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every input element. was (Author: dian.fu): {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend. I think the optimization should also apply to {{RowTimeUnboundedOver}}. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265133#comment-16265133 ] ASF GitHub Bot commented on FLINK-8144: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend after optimizing the timer logic. I think the optimization should also apply to {{RowTimeUnboundedOver}}. BTW: the watermark we use in the CEP use case is {{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every input element. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265130#comment-16265130 ] Dian Fu commented on FLINK-8144: {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend. I think the optimization should also apply to {{RowTimeUnboundedOver}}. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8145) IOManagerAsync not properly shut down in various tests
Nico Kruber created FLINK-8145: -- Summary: IOManagerAsync not properly shut down in various tests Key: FLINK-8145 URL: https://issues.apache.org/jira/browse/FLINK-8145 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Nico Kruber Assignee: Nico Kruber In various tests, e.g. {{AsynchronousBufferFileWriterTest}}, {{BufferFileWriterReaderTest}}, or {{HashTableITCase}}, {{IOManagerAsync}} instances are used which are not shut down at all. The only thing cleaning them up seems to be a shutdown handler attached to the JVM. Since each {{IOManagerAsync}} is spawning a number of reader and writer threads, this may put a significant burden on the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6909) Flink should support Lombok POJO
[ https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-6909: --- Assignee: Timo Walther > Flink should support Lombok POJO > > > Key: FLINK-6909 > URL: https://issues.apache.org/jira/browse/FLINK-6909 > Project: Flink > Issue Type: Wish > Components: Type Serialization System >Reporter: Md Kamaruzzaman >Assignee: Timo Walther >Priority: Minor > > Project lombok helps greatly to reduce boilerplate Java Code. > It seems that Flink does not accept a lombok POJO as a valid pojo. > e.g. Here is a POJO defined with lombok: > @Getter > @Setter > @NoArgsConstructor > public class SimplePojo > Using this Pojo class to read from CSV file throws this exception: > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > It would be great if flink supports lombok POJO. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6909) Flink should support Lombok POJO
[ https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265118#comment-16265118 ] Timo Walther commented on FLINK-6909: - This looks like a bug to me. I will assign the issue to me and have a look at it. > Flink should support Lombok POJO > > > Key: FLINK-6909 > URL: https://issues.apache.org/jira/browse/FLINK-6909 > Project: Flink > Issue Type: Wish > Components: Type Serialization System >Reporter: Md Kamaruzzaman >Priority: Minor > > Project lombok helps greatly to reduce boilerplate Java Code. > It seems that Flink does not accept a lombok POJO as a valid pojo. > e.g. Here is a POJO defined with lombok: > @Getter > @Setter > @NoArgsConstructor > public class SimplePojo > Using this Pojo class to read from CSV file throws this exception: > Exception in thread "main" java.lang.ClassCastException: > org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to > org.apache.flink.api.java.typeutils.PojoTypeInfo > It would be great if flink supports lombok POJO. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7316) always use off-heap network buffers
[ https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-7316. - Resolution: Fixed Fix Version/s: 1.5.0 Merged in 1854a3de19. > always use off-heap network buffers > --- > > Key: FLINK-7316 > URL: https://issues.apache.org/jira/browse/FLINK-7316 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > Fix For: 1.5.0 > > > In order to send flink buffers through netty into the network, we need to > make the buffers use off-heap memory. Otherwise, there will be a hidden copy > happening in the NIO stack. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7316) always use off-heap network buffers
[ https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265104#comment-16265104 ] ASF GitHub Bot commented on FLINK-7316: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4481 > always use off-heap network buffers > --- > > Key: FLINK-7316 > URL: https://issues.apache.org/jira/browse/FLINK-7316 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to send flink buffers through netty into the network, we need to > make the buffers use off-heap memory. Otherwise, there will be a hidden copy > happening in the NIO stack. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4481: [FLINK-7316][network] always use off-heap network ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4481 ---
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265097#comment-16265097 ] ASF GitHub Bot commented on FLINK-8144: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152929249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- Registering multiple timers on the same timestamp (`curWatermark + 1`), means that only one timer exists that fires exactly once when a watermark is received. By registering timers on different timestamps, we have many timers that all will fire when a watermark is received. I don't think this is what we want. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152929249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- Registering multiple timers on the same timestamp (`curWatermark + 1`), means that only one timer exists that fires exactly once when a watermark is received. By registering timers on different timestamps, we have many timers that all will fire when a watermark is received. I don't think this is what we want. ---
[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4481 Will merge this now. ---
[jira] [Commented] (FLINK-7316) always use off-heap network buffers
[ https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265093#comment-16265093 ] ASF GitHub Bot commented on FLINK-7316: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4481 LGTM > always use off-heap network buffers > --- > > Key: FLINK-7316 > URL: https://issues.apache.org/jira/browse/FLINK-7316 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to send flink buffers through netty into the network, we need to > make the buffers use off-heap memory. Otherwise, there will be a hidden copy > happening in the NIO stack. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7316) always use off-heap network buffers
[ https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265094#comment-16265094 ] ASF GitHub Bot commented on FLINK-7316: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4481 Will merge this now. > always use off-heap network buffers > --- > > Key: FLINK-7316 > URL: https://issues.apache.org/jira/browse/FLINK-7316 > Project: Flink > Issue Type: Sub-task > Components: Core, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > In order to send flink buffers through netty into the network, we need to > make the buffers use off-heap memory. Otherwise, there will be a hidden copy > happening in the NIO stack. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4481 LGTM ð ---
[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver
[ https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265088#comment-16265088 ] Fabian Hueske commented on FLINK-8144: -- Did you observe this behavior in practice, i.e., while debugging, or is that an assumption based on reading the code? I'm asking, because a timer that is registered on {{current_watermark + 1}} fires just once when the next watermark is received (the logical clock is only advanced by watermarks). Also by registering multiple timers on the same timestamp, the timer gets overridden, so there will be only one timer that fires. I think the current implementation should behave just as expected, i.e., just go once over the MapState when a watermark is received. > Optimize the timer logic in RowTimeUnboundedOver > > > Key: FLINK-8144 > URL: https://issues.apache.org/jira/browse/FLINK-8144 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > Currently the logic of {{RowTimeUnboundedOver}} is as follows: > 1) When element comes, buffer it in MapState and and register a timer at > {{current watermark + 1}} > 2) When event timer triggered, scan the MapState and find the elements below > the current watermark and process it. If there are remaining elements to > process, register a new timer at {{current watermark + 1}}. > Let's assume that watermark comes about 5 seconds later than the event on > average, then we will scan about 5000 times the MapState before actually > processing the events. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265082#comment-16265082 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152923905 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala --- @@ -19,50 +19,47 @@ package org.apache.flink.table.codegen import org.apache.flink.api.common.io.GenericInputFormat import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.CodeGenUtils.newName import org.apache.flink.table.codegen.Indenter.toISC import org.apache.flink.types.Row /** * A code generator for generating Flink [[GenericInputFormat]]s. - * - * @param config configuration that determines runtime behavior */ -class InputFormatCodeGenerator( -config: TableConfig) - extends CodeGenerator(config, false, new RowTypeInfo(), None, None) { - +object InputFormatCodeGenerator { /** * Generates a values input format that can be passed to Java compiler. * +* @param ctx The code generator context * @param name Class name of the input format. Must not be unique but has to be a * valid Java class identifier. * @param records code for creating records * @param returnType expected return type +* @param outRecordTerm term of the output * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ def generateValuesInputFormat[T <: Row]( -name: String, -records: Seq[String], -returnType: TypeInformation[T]) - : GeneratedInput[GenericInputFormat[T], T] = { + ctx: CodeGeneratorContext, + name: String, + records: Seq[String], + returnType: TypeInformation[T], + outRecordTerm: String = CodeGeneratorContext.DEFAULT_OUT_RECORD_TERM) --- End diff -- Do we need this parameter? > Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator > --- > > Key: FLINK-7959 > URL: https://issues.apache.org/jira/browse/FLINK-7959 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > > Right now {{CodeGenerator}} actually acts two roles, one is responsible for > generating codes from RexNode, and the other one is keeping lots of reusable > statements. It makes more sense to split these logic into two dedicated > classes. > The new {{CodeGeneratorContext}} will keep all the reusable statements, while > the new {{ExprCodeGenerator}} will only do generating codes from RexNode. > And for classes like {{AggregationCodeGenerator}} or > {{FunctionCodeGenerator}}, I think the should not be the subclasses of the > {{CodeGenerator}}, but should all as standalone classes. They can create > {{ExprCodeGenerator}} when they need to generating codes from RexNode, and > they can also generating codes by themselves. The {{CodeGeneratorContext}} > can be passed around to collect all reusable statements, and list them in the > final generated class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265080#comment-16265080 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152923370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala --- @@ -21,75 +21,49 @@ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName} import org.apache.flink.table.codegen.Indenter.toISC /** * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s. - * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], [[ProcessFunction]]. - * - * @param config configuration that determines runtime behavior - * @param nullableInput input(s) can be null. - * @param input1 type information about the first input of the Function - * @param input2 type information about the second input if the Function is binary - * @param input1FieldMapping additional mapping information for input1 - * (e.g. POJO types have no deterministic field order and some input fields might not be read) - * @param input2FieldMapping additional mapping information for input2 - * (e.g. POJO types have no deterministic field order and some input fields might not be read) + * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], [[ProcessFunction]], and + * the corresponding rich version of the functions. */ -class FunctionCodeGenerator( -config: TableConfig, -nullableInput: Boolean, -input1: TypeInformation[_ <: Any], -input2: Option[TypeInformation[_ <: Any]] = None, -input1FieldMapping: Option[Array[Int]] = None, -input2FieldMapping: Option[Array[Int]] = None) - extends CodeGenerator( -config, -nullableInput, -input1, -input2, -input1FieldMapping, -input2FieldMapping) { - - /** -* A code generator for generating unary Flink -* [[org.apache.flink.api.common.functions.Function]]s with one input. -* -* @param config configuration that determines runtime behavior -* @param nullableInput input(s) can be null. -* @param input type information about the input of the Function -* @param inputFieldMapping additional mapping information necessary for input -* (e.g. POJO types have no deterministic field order and some input fields might not be read) -*/ - def this( -config: TableConfig, -nullableInput: Boolean, -input: TypeInformation[Any], -inputFieldMapping: Array[Int]) = -this(config, nullableInput, input, None, Some(inputFieldMapping)) +object FunctionCodeGenerator { /** * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java * compiler. * +* @param ctx The context of the code generator * @param name Class name of the Function. Must not be unique but has to be a valid Java class * identifier. * @param clazz Flink Function to be generated. * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or * output record can be accessed via the given term methods. * @param returnType expected return type +* @param input1Type the first input type +* @param input1Term the first input term +* @param input2Type the second input type, optional. +* @param input2Term the second input term. +* @param collectorTerm the collector term +* @param contextTerm the context term * @tparam F Flink Function to be generated. * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ def generateFunction[F <: Function, T <: Any]( -name: String, -clazz: Class[F], -bodyCode: String, -returnType: TypeInformation[T]) - : GeneratedFunction[F, T] = { + ctx: CodeGeneratorContext, + name: String, + clazz: Class[F], + bodyCode: String, + returnType: TypeInformation[T], + input1Type: TypeInformation[_ <: Any], --- End diff -- That's a lot of parameters. We should really think about moving input related information to the
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265079#comment-16265079 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152921238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala --- @@ -203,6 +210,569 @@ object CodeGenUtils { throw new CodeGenException("Integer expression type expected.") } + def generateNullLiteral( --- End diff -- Actually these methods should be part of the `ExprCodeGenerator`. They are more than just utils but the actual main logic. > Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator > --- > > Key: FLINK-7959 > URL: https://issues.apache.org/jira/browse/FLINK-7959 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > > Right now {{CodeGenerator}} actually acts two roles, one is responsible for > generating codes from RexNode, and the other one is keeping lots of reusable > statements. It makes more sense to split these logic into two dedicated > classes. > The new {{CodeGeneratorContext}} will keep all the reusable statements, while > the new {{ExprCodeGenerator}} will only do generating codes from RexNode. > And for classes like {{AggregationCodeGenerator}} or > {{FunctionCodeGenerator}}, I think the should not be the subclasses of the > {{CodeGenerator}}, but should all as standalone classes. They can create > {{ExprCodeGenerator}} when they need to generating codes from RexNode, and > they can also generating codes by themselves. The {{CodeGeneratorContext}} > can be passed around to collect all reusable statements, and list them in the > final generated class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265083#comment-16265083 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152921868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala --- @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import java.math.{BigDecimal => JBigDecimal} + +import org.apache.calcite.avatica.util.DateTimeUtils +import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} + +import scala.collection.mutable + +/** + * The context for code generator, maintaining various reusable statements that could be insert + * into different code sections in the final generated class. + */ +class CodeGeneratorContext { + + // set of member statements that will be added only once + private val reusableMemberStatements: mutable.LinkedHashSet[String] = +mutable.LinkedHashSet[String]() + + // set of constructor statements that will be added only once + private val reusableInitStatements: mutable.LinkedHashSet[String] = +mutable.LinkedHashSet[String]() + + // set of statements that will be added only once per record + private val reusablePerRecordStatements: mutable.LinkedHashSet[String] = +mutable.LinkedHashSet[String]() + + // set of open statements for RichFunction that will be added only once + private val reusableOpenStatements: mutable.LinkedHashSet[String] = +mutable.LinkedHashSet[String]() + + // set of close statements for RichFunction that will be added only once + private val reusableCloseStatements: mutable.LinkedHashSet[String] = +mutable.LinkedHashSet[String]() + + // map of initial input unboxing expressions that will be added only once + // (inputTerm, index) -> expr + private val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = +mutable.Map[(String, Int), GeneratedExpression]() + + // set of constructor statements that will be added only once + private val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = +mutable.LinkedHashSet[(String, String)]() + + /** + * @return code block of statements that need to be placed in the member area of the class + * (e.g. member variables and their initialization) + */ + def reuseMemberCode(): String = { +reusableMemberStatements.mkString("") + } + + /** + * @return code block of statements that need to be placed in the constructor + */ + def reuseInitCode(): String = { +reusableInitStatements.mkString("") + } + + /** + * @return code block of statements that need to be placed in the per recode process block + * (e.g. Function or StreamOperator's processElement) + */ + def reusePerRecordCode(): String = { +reusablePerRecordStatements.mkString("") + } + + /** + * @return code block of statements that need to be placed in the open() method + * (e.g. RichFunction or StreamOperator) + */ + def reuseOpenCode(): String = { +reusableOpenStatements.mkString("") + } + + /** + * @return code block of statements that need to be placed in the close() method + * (e.g. RichFunction or StreamOperator) + */
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265081#comment-16265081 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152924498 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala --- @@ -31,28 +32,28 @@ class CurrentTimePointCallGen( extends CallGenerator { override def generate( - codeGenerator: CodeGenerator, - operands: Seq[GeneratedExpression]) -: GeneratedExpression = targetType match { + ctx: CodeGeneratorContext, --- End diff -- Use `ExprCodeGenerator` here such that we don't need to pass `nullCheck` to every util function, if those functions are part of `ExprCodeGenerator` class. With this approach we don't need to add `nullCheck` to `CallGenerate.generate`. > Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator > --- > > Key: FLINK-7959 > URL: https://issues.apache.org/jira/browse/FLINK-7959 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > > Right now {{CodeGenerator}} actually acts two roles, one is responsible for > generating codes from RexNode, and the other one is keeping lots of reusable > statements. It makes more sense to split these logic into two dedicated > classes. > The new {{CodeGeneratorContext}} will keep all the reusable statements, while > the new {{ExprCodeGenerator}} will only do generating codes from RexNode. > And for classes like {{AggregationCodeGenerator}} or > {{FunctionCodeGenerator}}, I think the should not be the subclasses of the > {{CodeGenerator}}, but should all as standalone classes. They can create > {{ExprCodeGenerator}} when they need to generating codes from RexNode, and > they can also generating codes by themselves. The {{CodeGeneratorContext}} > can be passed around to collect all reusable statements, and list them in the > final generated class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265077#comment-16265077 ] ASF GitHub Bot commented on FLINK-7959: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4940#discussion_r152918458 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -985,14 +985,11 @@ abstract class TableEnvironment(val config: TableConfig) { } // code generate MapFunction -val generator = new FunctionCodeGenerator( - config, - false, - inputTypeInfo, - None, - None) - -val conversion = generator.generateConverterResultExpression( +val ctx = CodeGeneratorContext() +val exprGenerator = new ExprCodeGenerator(ctx, false, config.getNullCheck) --- End diff -- Should we move the `nullableInput` to `bindInput`. If an input is not required why should this flag be required? We can also add an default `false` value. > Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator > --- > > Key: FLINK-7959 > URL: https://issues.apache.org/jira/browse/FLINK-7959 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > > Right now {{CodeGenerator}} actually acts two roles, one is responsible for > generating codes from RexNode, and the other one is keeping lots of reusable > statements. It makes more sense to split these logic into two dedicated > classes. > The new {{CodeGeneratorContext}} will keep all the reusable statements, while > the new {{ExprCodeGenerator}} will only do generating codes from RexNode. > And for classes like {{AggregationCodeGenerator}} or > {{FunctionCodeGenerator}}, I think the should not be the subclasses of the > {{CodeGenerator}}, but should all as standalone classes. They can create > {{ExprCodeGenerator}} when they need to generating codes from RexNode, and > they can also generating codes by themselves. The {{CodeGeneratorContext}} > can be passed around to collect all reusable statements, and list them in the > final generated class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)