[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265563#comment-16265563
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r153034462
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full 

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-11-24 Thread ChrisChinchilla
Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r153034462
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
+have the previous checkpoint and the state changes for the current 
checkpoint, we can restore the full, current state
+for the job. This is the basic principle of incremental checkpoints, that 
each checkpoint can build upon a history of
+previous checkpoints to avoid writing redundant information.
+
+Figure 1 illustrates the basic idea of incremental checkpointing in 
comparison to full checkpointing.
+
+The state of the job evolves over time and for checkpoints ``CP 1`` to 
``CP 2``, a full checkpoint is simply a copy of the whole
+state.
+
+
+   
+
+
+With incremental checkpointing, each checkpoint contains only the state 
change since the previous checkpoint.
+
+* For the first 

[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1626#comment-1626
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r153033430
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
--- End diff --

@StefanRRichter I'm struggling to understand this sentence.

> As long as we have the previous checkpoint and the state changes for the 
current checkpoint, we can restore the full, current state for the job.

Do you mean…

> As long as we have the previous checkpoint, if the state changes for the 
current checkpoint, we can restore the full, current state for the job.

Or something different? Explain to me what you're trying to say :)


> Improve and enhance 

[GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...

2017-11-24 Thread ChrisChinchilla
Github user ChrisChinchilla commented on a diff in the pull request:

https://github.com/apache/flink/pull/4543#discussion_r153033430
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -99,3 +99,296 @@ above).
 ```sh
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
 ```
+
+## Incremental Checkpoints
+
+### Synopsis
+
+Incremental checkpoints can significantly reduce checkpointing time in 
comparison to full checkpoints, at the cost of a
+(potentially) longer recovery time. The core idea is that incremental 
checkpoints only record changes in state since the
+previously-completed checkpoint instead of producing a full, 
self-contained backup of the state backend. In this way,
+incremental checkpoints can build upon previous checkpoints.
+
+RocksDBStateBackend is currently the only backend that supports 
incremental checkpoints.
+
+Flink leverages RocksDB's internal backup mechanism in a way that is 
self-consolidating over time. As a result, the
+incremental checkpoint history in Flink does not grow indefinitely, and 
old checkpoints are eventually subsumed and
+pruned automatically.
+
+``While we strongly encourage the use of incremental checkpoints for Flink 
jobs with large state, please note that this is
+a new feature and currently not enabled by default``.
+
+To enable this feature, users can instantiate a `RocksDBStateBackend` with 
the corresponding boolean flag in the
+constructor set to `true`, e.g.:
+
+```java
+   RocksDBStateBackend backend =
+   new RocksDBStateBackend(filebackend, true);
+```
+
+### Use-case for Incremental Checkpoints
+
+Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and 
each checkpoint represents a consistent
+snapshot of the distributed state of a Flink job from which the system can 
recover in case of a software or machine
+failure (see [here]({{ site.baseurl 
}}/internals/stream_checkpointing.html). 
+
+Flink creates checkpoints periodically to track the progress of a job so 
that, in case of failure, only those
+(hopefully few) *events that have been processed after the last completed 
checkpoint* must be reprocessed from the data
+source. The number of events that must be reprocessed has implications for 
recovery time, and so for fastest recovery,
+we want to *take checkpoints as often as possible*.
+
+However, checkpoints are not without performance cost and can introduce 
*considerable overhead* to the system. This
+overhead can lead to lower throughput and higher latency during the time 
that checkpoints are created. One reason is
+that, traditionally, each checkpoint in Flink always represented the 
*complete state* of the job at the time of the
+checkpoint, and all of the state had to be written to stable storage 
(typically some distributed file system) for every
+single checkpoint. Writing multiple terabytes (or more) of state data for 
each checkpoint can obviously create
+significant load for the I/O and network subsystems, on top of the normal 
load from pipeline’s data processing work.
+
+Before incremental checkpoints, users were stuck with a suboptimal 
tradeoff between recovery time and checkpointing
+overhead. Fast recovery and low checkpointing overhead were conflicting 
goals. And this is exactly the problem that
+incremental checkpoints solve.
+
+
+### Basics of Incremental Checkpoints
+
+In this section, for the sake of getting the concept across, we will 
briefly discuss the idea behind incremental
+checkpoints in a simplified manner.
+
+Our motivation for incremental checkpointing stemmed from the observation 
that it is often wasteful to write the full
+state of a job for every single checkpoint. In most cases, the state 
between two checkpoints is not drastically
+different, and only a fraction of the state data is modified and some new 
data added. Instead of writing the full state
+into each checkpoint again and again, we could record only changes in 
state since the previous checkpoint. As long as we
--- End diff --

@StefanRRichter I'm struggling to understand this sentence.

> As long as we have the previous checkpoint and the state changes for the 
current checkpoint, we can restore the full, current state for the job.

Do you mean…

> As long as we have the previous checkpoint, if the state changes for the 
current checkpoint, we can restore the full, current state for the job.

Or something different? Explain to me what you're trying to say :)


---


[GitHub] flink issue #5045: [hotfix][docs] Review of concepts docs for grammar and cl...

2017-11-24 Thread ChrisChinchilla
Github user ChrisChinchilla commented on the issue:

https://github.com/apache/flink/pull/5045
  
@greghogan That said, the whole 'we' vs 'you' discussion is purely a 
stylistic preference (if that's what you meant) as long as it's consistent and 
clear who you mean, but the comment about removing passive voice remains :)


---


[GitHub] flink issue #5045: [hotfix][docs] Review of concepts docs for grammar and cl...

2017-11-24 Thread ChrisChinchilla
Github user ChrisChinchilla commented on the issue:

https://github.com/apache/flink/pull/5045
  
@greghogan @zentol I will get these updates to you asap.

But I will have to strongly disagree on the 'you' and passive voice points. 
I've been working as a technical writer for some time, and this a point we're 
all fairly strong on pushing as in the long run it does make documents much 
clearer, less ambiguous and easier to read.

Granted a LOT of the Flink documentation doesn't follow this style right 
now, and that's something I personally would love to change over time, as I 
really think it would help people understand better.

Ideally I would love to create a more comprehensive style guide and 
explanation of this (and I have boilerplates I've used with other projects) and 
change this voicing over time.

Thoughts?


---


[jira] [Created] (FLINK-8150) WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as TaskManager IDs

2017-11-24 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8150:
---

 Summary: WebUI in FLIP-6 mode exposes TaskManagerConnection IDs as 
TaskManager IDs
 Key: FLINK-8150
 URL: https://issues.apache.org/jira/browse/FLINK-8150
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, REST
Affects Versions: 1.5.0
Reporter: Gary Yao
Priority: Blocker
 Fix For: 1.5.0


TaskManager IDs exposed by 
{{org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler}} 
cannot be used as input to query TaskManager metrics with method 
{{MetricStore#getTaskManagerMetricStore(String)}}.

*Reason*
{{ResourceManager#requestTaskManagerInfo(Time)}} returns {{TaskManagerInfo}} s 
where the instance IDs are set to the IDs of the {{TaskExecutorConnection}} s. 
While {{ResourceManager#requestTaskManagerMetricQueryServicePaths(Time)}} 
returns the Taskmanager resource IDs.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8149) Replace usages of deprecated SerializationSchema

2017-11-24 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-8149:
-

 Summary: Replace usages of deprecated SerializationSchema
 Key: FLINK-8149
 URL: https://issues.apache.org/jira/browse/FLINK-8149
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.4.0
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.5.0


The deprecated {{SerializationSchema}}  in {{flink-streaming-java}}, has been 
moved to {{flink-core}}.
But, the deprecate {{SerializationSchema}} is still used in 
{{flink-connector-kinesis}}.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265444#comment-16265444
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Rat check did not pass. Could you please fix this @NicoK.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4581: [FLINK-7499][io] fix double buffer release in SpillableSu...

2017-11-24 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Rat check did not pass. Could you please fix this @NicoK.


---


[jira] [Commented] (FLINK-8142) Cleanup reference to deprecated constants in ConfigConstants

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265443#comment-16265443
 ] 

ASF GitHub Bot commented on FLINK-8142:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5067

[FLINK-8142][config] Cleanup reference to deprecated constants in 
ConfigConstants

## What is the purpose of the change

ConfigConstants contains several deprecated String constants that are used 
by other Flink modules. Those should be cleaned up.

## Brief change log

  - *Replace usages of deprecated `TASK_MANAGER_MEMORY_SIZE_KEY` , 
`EXECUTION_RETRIES_KEY`, `EXECUTION_RETRY_DELAY_KEY`, 
`TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY`, `JOB_MANAGER_IPC_ADDRESS_KEY`*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-8142

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5067


commit 0cf6eb6a0bbee829adedc9ddce230c4aa70abe94
Author: yew1eb 
Date:   2017-11-23T15:52:38Z

[FLINK-8142][minor] Cleanup reference to deprecated constants in 
ConfigConstants




> Cleanup reference to deprecated constants in ConfigConstants
> 
>
> Key: FLINK-8142
> URL: https://issues.apache.org/jira/browse/FLINK-8142
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> ConfigConstants contains several deprecated String constants that are used by 
> other Flink modules. Those should be cleaned up.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5067: [FLINK-8142][config] Cleanup reference to deprecat...

2017-11-24 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5067

[FLINK-8142][config] Cleanup reference to deprecated constants in 
ConfigConstants

## What is the purpose of the change

ConfigConstants contains several deprecated String constants that are used 
by other Flink modules. Those should be cleaned up.

## Brief change log

  - *Replace usages of deprecated `TASK_MANAGER_MEMORY_SIZE_KEY` , 
`EXECUTION_RETRIES_KEY`, `EXECUTION_RETRY_DELAY_KEY`, 
`TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY`, `JOB_MANAGER_IPC_ADDRESS_KEY`*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-8142

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5067.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5067


commit 0cf6eb6a0bbee829adedc9ddce230c4aa70abe94
Author: yew1eb 
Date:   2017-11-23T15:52:38Z

[FLINK-8142][minor] Cleanup reference to deprecated constants in 
ConfigConstants




---


[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-7694:

Description: Port 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to 
new handler that works with {{RestServerEndpoint}}. Add new handler to 
{{DispatcherRestEndpoint}}.  (was: Port 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}})

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
>  Labels: flip6
> Fix For: 1.5.0
>
>
> Port 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}} to 
> new handler that works with {{RestServerEndpoint}}. Add new handler to 
> {{DispatcherRestEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-24 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Hi @aljoscha, I wonder if you could help review this PR when you are 
convenient.

Thanks, Xingcan


---


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265363#comment-16265363
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5032
  
Hi @aljoscha, I wonder if you could help review this PR when you are 
convenient.

Thanks, Xingcan


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7883) Stop fetching source before a cancel with savepoint

2017-11-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265345#comment-16265345
 ] 

Aljoscha Krettek commented on FLINK-7883:
-

The problem is that this is harder than it looks. In a Flink pipeline data can 
not only originate at sources but there can be other things in the pipeline 
that produce events "out of thin air". From the top of my head I can think of: 
 - Sources: this is obvious, sources emit events
 - Reaction to watermark timers, these can emit events. Not as easy to control 
because watermarks can not only originate at sources but also at any operator 
in the pipeline
 - Reaction to processing-time timers, these can also emit events at any time
 - "Special" operators that emit data from a separate thread. Currently this 
would be the Async I/O operator and the {{ContinuousFileReaderOperator}} which 
forms the "file source" together with {{ContinuousFileMonitoringFunction}}.

If we want to make sure that we don't emit any unwanted data we have to 
_quiesce_ the whole pipeline first. This can be done via special messages (like 
watermarks) that are injected at the sources and traverse the topology based on 
a message from the {{JobManager}}. All operators would have to report that they 
are quiesced before we do the savepoint. In case the savepoint fails for some 
reason we need to _un-quiesce_ the pipeline again.

The above is the hard part. Making sure that we don't emit data from a "source" 
is as simple as ensuring that {{SourceContext.collect()}} doesn't forward the 
data that the source wants to emit.

We definitely have to solve this problem but I don't think that we can do this 
for the 1.5 release because the community decided to do a very short release 
cycle after 1.4.0 because a bunch of important features are almost ready to be 
released?

> Stop fetching source before a cancel with savepoint
> ---
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5066: [FLINK-8148][yarn/s3] fix test instability in YarnFileSta...

2017-11-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5066
  
Makes sense, +1


---


[jira] [Commented] (FLINK-8148) Test instability in YarnFileStageTest

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265344#comment-16265344
 ] 

ASF GitHub Bot commented on FLINK-8148:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5066
  
Makes sense, +1


> Test instability in YarnFileStageTest
> -
>
> Key: FLINK-8148
> URL: https://issues.apache.org/jira/browse/FLINK-8148
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> {code}
> Running org.apache.flink.yarn.YarnFileStageTestS3ITCase
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase
> testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) 
>  Time elapsed: 8.515 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192)
> {code}
> from https://travis-ci.org/apache/flink/jobs/305861539
> {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the 
> test directory used is cleaned up by 
> {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the 
> directory (in a {{finally}} block). However, for S3, we may not always see 
> our own deletes.
> Quoting from https://aws.amazon.com/s3/faqs/ here:
> {quote}Q: What data consistency model does Amazon S3 employ?
> Amazon S3 buckets in all Regions provide read-after-write consistency for 
> PUTS of new objects and eventual consistency for overwrite PUTS and 
> DELETES.{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8100) Consider introducing log4j-extras

2017-11-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265322#comment-16265322
 ] 

Aljoscha Krettek commented on FLINK-8100:
-

We don't use log4j directly but slf4j. Would this still work?

> Consider introducing log4j-extras 
> --
>
> Key: FLINK-8100
> URL: https://issues.apache.org/jira/browse/FLINK-8100
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> log4j-extras allows log rotation as well as compression.
> https://logging.apache.org/log4j/extras/download.html
> We should consider using log4j-extras.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8103) Flink 1.4 not writing to standard out log file

2017-11-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265321#comment-16265321
 ] 

Aljoscha Krettek commented on FLINK-8103:
-

I just ran an example like this on the (just-released) RC1 for Flink 1.4.0. The 
output was in fact in the .out file of the TaskManager. Could you try it with 
that as well?

RC1: http://people.apache.org/~aljoscha/flink-1.4.0-rc1/

> Flink 1.4 not writing to standard out log file
> --
>
> Key: FLINK-8103
> URL: https://issues.apache.org/jira/browse/FLINK-8103
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.0
> Environment: macOS 10.13 (High Sierra)
>Reporter: Ryan Brideau
>
> I built the latest snapshot of 1.4 yesterday and tried testing it with a 
> simple word count example, where StreamUtil is just a helper that checks 
> input parameters:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
> object Words {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val dataStream = StreamUtil.getDataStream(env, params)
> val wordDataStream = dataStream
>   .flatMap{ _.split(" ") }
> wordDataStream.println
> // execute program
> env.execute("Words Scala")
>   }
> }
> {code}
> This runs without an issue on the latest stable version of 1.3 and writes its 
> results to the _out_ file, which I can tail to see the results. This doesn't 
> happen in 1.4, however. I can modify it to write out to a file, however:
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.core.fs.FileSystem.WriteMode
> import org.apache.flink.streaming.api.scala._
> object Words {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val params = ParameterTool.fromArgs(args)
> env.getConfig.setGlobalJobParameters(params)
> val dataStream = StreamUtil.getDataStream(env, params)
> val wordDataStream = dataStream
>   .flatMap{ _.split(" ") }
> wordDataStream
>   .writeAsText("file:///somepath/output", WriteMode.OVERWRITE)
>   .setParallelism(1)
> // execute program
> env.execute("Words Scala")
>   }
> }
> {code}
> Any clues as to what might be causing this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8124) EventTimeTrigger (and other triggers) could have less specific generic types

2017-11-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265309#comment-16265309
 ] 

Aljoscha Krettek commented on FLINK-8124:
-

I think that should be feasible, yes.

> EventTimeTrigger (and other triggers) could have less specific generic types
> 
>
> Key: FLINK-8124
> URL: https://issues.apache.org/jira/browse/FLINK-8124
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Cristian
>Priority: Minor
>
> When implementing custom WindowAssigners, it is possible to need different 
> implementations of the {{Window}} class (other than {{TimeWindow}}). In such 
> cases, it is not possible to use the existing triggers (e.g. 
> {{EventTimeTrigger}}) because it extends from {{Trigger}} 
> which is (unnecessarily?) specific.
> It should be possible to make that class more generic by using 
> {{EventTimeTrigger extends Trigger}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6294) BucketingSink throws NPE while cancelling job

2017-11-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6294.
---
Resolution: Fixed

Fixed on release-1.3 in
5aea4917662d80899d3adff0378b97ae58aa2afc

Fixed on release-1.4 in
f636276dee991160a640442cbfcfdd58bfa57806

Fixed on master in
d7911c5a8a6896261c55b61ea4633e706270baa1

> BucketingSink throws NPE while cancelling job
> -
>
> Key: FLINK-6294
> URL: https://issues.apache.org/jira/browse/FLINK-6294
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: Andrey
> Fix For: 1.3.3, 1.4.1
>
>
> Steps to reproduce:
> * configure BucketingSink and run job
> * cancel job from UI before processing any messages
> * in logs:
> {code}
> 2017-04-11 10:14:54,681 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Source: Custom 
> Source (1/2) [Source: Custom Source (1/2)]
> 2017-04-11 10:14:54,881 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) 
> [flink-akka.actor.default-dispatcher-4]
> 2017-04-11 10:14:56,584 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)]
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6294) BucketingSink throws NPE while cancelling job

2017-11-24 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6294:

Fix Version/s: 1.4.1
   1.3.3

> BucketingSink throws NPE while cancelling job
> -
>
> Key: FLINK-6294
> URL: https://issues.apache.org/jira/browse/FLINK-6294
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.2.0
>Reporter: Andrey
> Fix For: 1.3.3, 1.4.1
>
>
> Steps to reproduce:
> * configure BucketingSink and run job
> * cancel job from UI before processing any messages
> * in logs:
> {code}
> 2017-04-11 10:14:54,681 INFO  org.apache.flink.core.fs.FileSystem 
>   - Ensuring all FileSystem streams are closed for Source: Custom 
> Source (1/2) [Source: Custom Source (1/2)]
> 2017-04-11 10:14:54,881 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> Un-registering task and sending final execution state CANCELED to JobManager 
> for task Source: Custom Source (56d0c9ffe06dc3e4481e7ce530d9894f) 
> [flink-akka.actor.default-dispatcher-4]
> 2017-04-11 10:14:56,584 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator. [Flat Map -> Sink: Unnamed (2/2)]
> java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.close(BucketingSink.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-7694:

Component/s: Distributed Coordination

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
>  Labels: flip6
> Fix For: 1.5.0
>
>
> Port 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-7694:

Labels: flip6  (was: )

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
>  Labels: flip6
> Fix For: 1.5.0
>
>
> Port 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-7694:

Description: Port 
{{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}}

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
> Fix For: 1.5.0
>
>
> Port 
> {{org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-7694:
---

Assignee: Gary Yao  (was: Bowen Li)

> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Gary Yao
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5066: [FLINK-8148][yarn/s3] fix test instability in Yarn...

2017-11-24 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5066

[FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase

## What is the purpose of the change

`YarnFileStageTestS3ITCase.testRecursiveUploadForYarn` verifies that the 
test directory used is cleaned up by 
`YarnFileStageTest.testCopyFromLocalRecursive` which should clean up the 
directory (in a `finally` block). However, for S3, we may not always see our 
own deletes.

Quoting from https://aws.amazon.com/s3/faqs/ here:

> Q: What data consistency model does Amazon S3 employ?
> Amazon S3 buckets in all Regions provide read-after-write consistency for 
PUTS of new objects and eventual consistency for overwrite PUTS and DELETES.

## Brief change log

- Remove a check for a deleted directory since we may not see our own 
delete yet
with S3.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8148

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5066


commit 29d7a8e085a5655c00a3163546e22a5d1331ea60
Author: Nico Kruber 
Date:   2017-11-24T13:54:41Z

[FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase

Remove a check for a deleted directory since we may not see our own delete 
yet
with S3.




---


[jira] [Commented] (FLINK-8148) Test instability in YarnFileStageTest

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265301#comment-16265301
 ] 

ASF GitHub Bot commented on FLINK-8148:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5066

[FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase

## What is the purpose of the change

`YarnFileStageTestS3ITCase.testRecursiveUploadForYarn` verifies that the 
test directory used is cleaned up by 
`YarnFileStageTest.testCopyFromLocalRecursive` which should clean up the 
directory (in a `finally` block). However, for S3, we may not always see our 
own deletes.

Quoting from https://aws.amazon.com/s3/faqs/ here:

> Q: What data consistency model does Amazon S3 employ?
> Amazon S3 buckets in all Regions provide read-after-write consistency for 
PUTS of new objects and eventual consistency for overwrite PUTS and DELETES.

## Brief change log

- Remove a check for a deleted directory since we may not see our own 
delete yet
with S3.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8148

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5066


commit 29d7a8e085a5655c00a3163546e22a5d1331ea60
Author: Nico Kruber 
Date:   2017-11-24T13:54:41Z

[FLINK-8148][yarn/s3] fix test instability in YarnFileStageTestS3ITCase

Remove a check for a deleted directory since we may not see our own delete 
yet
with S3.




> Test instability in YarnFileStageTest
> -
>
> Key: FLINK-8148
> URL: https://issues.apache.org/jira/browse/FLINK-8148
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> {code}
> Running org.apache.flink.yarn.YarnFileStageTestS3ITCase
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase
> testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) 
>  Time elapsed: 8.515 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192)
> {code}
> from https://travis-ci.org/apache/flink/jobs/305861539
> {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the 
> test directory used is cleaned up by 
> {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the 
> directory (in a {{finally}} block). However, for S3, we may not always see 
> our own deletes.
> Quoting from https://aws.amazon.com/s3/faqs/ here:
> {quote}Q: What data consistency model does Amazon S3 employ?
> Amazon S3 buckets in all Regions provide read-after-write consistency for 
> PUTS of new objects and eventual consistency for overwrite PUTS and 
> DELETES.{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8148) Test instability in YarnFileStageTest

2017-11-24 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8148:
---
Labels: test-stability  (was: )

> Test instability in YarnFileStageTest
> -
>
> Key: FLINK-8148
> URL: https://issues.apache.org/jira/browse/FLINK-8148
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: test-stability
>
> {code}
> Running org.apache.flink.yarn.YarnFileStageTestS3ITCase
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase
> testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase) 
>  Time elapsed: 8.515 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171)
>   at 
> org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192)
> {code}
> from https://travis-ci.org/apache/flink/jobs/305861539
> {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the 
> test directory used is cleaned up by 
> {{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the 
> directory (in a {{finally}} block). However, for S3, we may not always see 
> our own deletes.
> Quoting from https://aws.amazon.com/s3/faqs/ here:
> {quote}Q: What data consistency model does Amazon S3 employ?
> Amazon S3 buckets in all Regions provide read-after-write consistency for 
> PUTS of new objects and eventual consistency for overwrite PUTS and 
> DELETES.{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8148) Test instability in YarnFileStageTest

2017-11-24 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8148:
--

 Summary: Test instability in YarnFileStageTest
 Key: FLINK-8148
 URL: https://issues.apache.org/jira/browse/FLINK-8148
 Project: Flink
  Issue Type: Bug
  Components: Tests, YARN
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber
Priority: Critical


{code}
Running org.apache.flink.yarn.YarnFileStageTestS3ITCase
Tests run: 3, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 13.152 sec <<< 
FAILURE! - in org.apache.flink.yarn.YarnFileStageTestS3ITCase
testRecursiveUploadForYarnS3(org.apache.flink.yarn.YarnFileStageTestS3ITCase)  
Time elapsed: 8.515 sec  <<< FAILURE!
java.lang.AssertionError: null
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertFalse(Assert.java:64)
at org.junit.Assert.assertFalse(Assert.java:74)
at 
org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarn(YarnFileStageTestS3ITCase.java:171)
at 
org.apache.flink.yarn.YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3(YarnFileStageTestS3ITCase.java:192)
{code}

from https://travis-ci.org/apache/flink/jobs/305861539

{{YarnFileStageTestS3ITCase.testRecursiveUploadForYarn}} verifies that the test 
directory used is cleaned up by 
{{YarnFileStageTest.testCopyFromLocalRecursive}} which should clean up the 
directory (in a {{finally}} block). However, for S3, we may not always see our 
own deletes.

Quoting from https://aws.amazon.com/s3/faqs/ here:
{quote}Q: What data consistency model does Amazon S3 employ?
Amazon S3 buckets in all Regions provide read-after-write consistency for PUTS 
of new objects and eventual consistency for overwrite PUTS and DELETES.{quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972315
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -729,6 +804,12 @@ public Void call() throws Exception {
};
}
 
+   /**
+* Submits all the callable tasks to the executor and waits the results.
--- End diff --

`waits for the results`


---


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265295#comment-16265295
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152963546
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
 
// Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
-   verify(bufferPool, times(11)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers.
+   // One exclusive buffer is taken before, so we should 
request 13 floating buffers.
+   verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel",
+   10, inputChannel.getNumberOfRequiredBuffers());
 
// Increase the backlog to exceed the number of 
available floating buffers
inputChannel.onSenderBacklog(10);
 
// The channel does not get enough floating buffer and 
register as buffer listener
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 12 buffers required in 
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+   assertEquals("There should be 11 buffers available in 
the channel",
+   11, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 12 buffers required in 
the channel",
+   12, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 0 buffer available in 
local pool",
+   0, 
bufferPool.getNumberOfAvailableMemorySegments());
 
// Continue increasing the backlog
-   inputChannel.onSenderBacklog(11);
+   inputChannel.onSenderBacklog(12);
 
// The channel is already in the status of waiting for 
buffers and will not request any more
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265293#comment-16265293
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152969860
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
 
// Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
-   verify(bufferPool, times(11)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers.
+   // One exclusive buffer is taken before, so we should 
request 13 floating buffers.
+   verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel",
+   10, inputChannel.getNumberOfRequiredBuffers());
 
// Increase the backlog to exceed the number of 
available floating buffers
inputChannel.onSenderBacklog(10);
 
// The channel does not get enough floating buffer and 
register as buffer listener
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 12 buffers required in 
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+   assertEquals("There should be 11 buffers available in 
the channel",
+   11, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 12 buffers required in 
the channel",
+   12, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 0 buffer available in 
local pool",
+   0, 
bufferPool.getNumberOfAvailableMemorySegments());
 
// Continue increasing the backlog
-   inputChannel.onSenderBacklog(11);
+   inputChannel.onSenderBacklog(12);
 
// The channel is already in the status of waiting for 
buffers and will not request any more
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972034
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a result
-   verify(bufferPool, times(11)).requestBuffer();
+   // The channel does not get enough floating buffer and 
register as buffer listener
+   verify(bufferPool, 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265296#comment-16265296
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152962580
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
--- End diff --

if you add `verify(bufferPool, times(4)).requestBuffer();` here, the 
difference to the 13 below becomes a bit more clear (because the requests are 
absolute values while as differences would be clear from the start, i.e. that 
we request 9 additional buffers - unfortunately, Mockito does not provide this 
as far as I know - but that's not a big deal as soon as everything is clear)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265297#comment-16265297
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972414
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
+   // Exhaust all the floating buffers
+   for (int i = 0; i < numFloatingBuffers; i++) {
+   Buffer buffer = bufferPool.requestBuffer();
+   assertNotNull(buffer);
+   floatingBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : floatingBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private void submitTasksAndWaitResults(ExecutorService executor, 
Callable[] tasks) throws Exception {
--- End diff --

maybe also rename to `submitTasksAndWaitForResults`


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152962580
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
--- End diff --

if you add `verify(bufferPool, times(4)).requestBuffer();` here, the 
difference to the 13 below becomes a bit more clear (because the requests are 
absolute values while as differences would be clear from the start, i.e. that 
we request 9 additional buffers - unfortunately, Mockito does not provide this 
as far as I know - but that's not a big deal as soon as everything is clear)


---


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152969860
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
 
// Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
-   verify(bufferPool, times(11)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers.
+   // One exclusive buffer is taken before, so we should 
request 13 floating buffers.
+   verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel",
+   10, inputChannel.getNumberOfRequiredBuffers());
 
// Increase the backlog to exceed the number of 
available floating buffers
inputChannel.onSenderBacklog(10);
 
// The channel does not get enough floating buffer and 
register as buffer listener
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 12 buffers required in 
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+   assertEquals("There should be 11 buffers available in 
the channel",
+   11, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 12 buffers required in 
the channel",
+   12, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 0 buffer available in 
local pool",
+   0, 
bufferPool.getNumberOfAvailableMemorySegments());
 
// Continue increasing the backlog
-   inputChannel.onSenderBacklog(11);
+   inputChannel.onSenderBacklog(12);
 
// The channel is already in the status of waiting for 
buffers and will not request any more
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 13 buffers required in 
the channel", 13, 

[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152963546
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -330,64 +332,120 @@ public void testRequestAndReturnFloatingBuffer() 
throws Exception {
// Prepare the exclusive and floating buffers to verify 
recycle logic later
Buffer exclusiveBuffer = inputChannel.requestBuffer();
assertNotNull(exclusiveBuffer);
-   Buffer floatingBuffer1 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer1);
-   Buffer floatingBuffer2 = bufferPool.requestBuffer();
-   assertNotNull(floatingBuffer2);
+
+   final int numRecycleFloatingBuffers = 4;
+   final ArrayDeque floatingBufferQueue = new 
ArrayDeque<>(numRecycleFloatingBuffers);
+   for (int i = 0; i < numRecycleFloatingBuffers; i++) {
+   Buffer floatingBuffer = 
bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer);
+   floatingBufferQueue.add(floatingBuffer);
+   }
 
// Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
-   verify(bufferPool, times(11)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers.
+   // One exclusive buffer is taken before, so we should 
request 13 floating buffers.
+   verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel",
+   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel",
+   10, inputChannel.getNumberOfRequiredBuffers());
 
// Increase the backlog to exceed the number of 
available floating buffers
inputChannel.onSenderBacklog(10);
 
// The channel does not get enough floating buffer and 
register as buffer listener
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 12 buffers required in 
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
-   assertEquals("There should be 0 buffer available in 
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+   assertEquals("There should be 11 buffers available in 
the channel",
+   11, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 12 buffers required in 
the channel",
+   12, inputChannel.getNumberOfRequiredBuffers());
+   assertEquals("There should be 0 buffer available in 
local pool",
+   0, 
bufferPool.getNumberOfAvailableMemorySegments());
 
// Continue increasing the backlog
-   inputChannel.onSenderBacklog(11);
+   inputChannel.onSenderBacklog(12);
 
// The channel is already in the status of waiting for 
buffers and will not request any more
-   verify(bufferPool, times(13)).requestBuffer();
+   verify(bufferPool, times(15)).requestBuffer();
verify(bufferPool, 
times(1)).addBufferListener(inputChannel);
-   assertEquals("There should be 11 buffers available in 
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
-   assertEquals("There should be 13 buffers required in 
the channel", 13, 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265294#comment-16265294
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972034
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws 
Exception {
}
 
/**
-* Tests to verify that the input channel requests floating buffers 
from buffer pool
-* in order to maintain backlog + initialCredit buffers available once 
receiving the
-* sender's backlog, and registers as listener if no floating buffers 
available.
+* Tests to verify that the input channel requests floating buffers 
from buffer pool for
+* maintaining (backlog + initialCredit) available buffers once 
receiving the sender's backlog.
+*
+* Verifies the logic of recycling floating buffer back into the 
input channel and the logic
+* of returning extra floating buffer into the buffer pool during 
recycling exclusive buffer.
 */
@Test
-   public void testRequestFloatingBufferOnSenderBacklog() throws Exception 
{
+   public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(12, 32, MemoryType.HEAP);
+   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(14, 32, MemoryType.HEAP);
+   final int numExclusiveBuffers = 2;
+   final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
try {
-   final int numFloatingBuffers = 10;
final BufferPool bufferPool = 
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
-   // Assign exclusive segments to the channel
-   final int numExclusiveBuffers = 2;
-   
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
 
-   assertEquals("There should be " + numExclusiveBuffers + 
" buffers available in the channel",
-   numExclusiveBuffers, 
inputChannel.getNumberOfAvailableBuffers());
+   // Prepare the exclusive and floating buffers to verify 
recycle logic later
+   Buffer exclusiveBuffer = inputChannel.requestBuffer();
+   assertNotNull(exclusiveBuffer);
+   Buffer floatingBuffer1 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer1);
+   Buffer floatingBuffer2 = bufferPool.requestBuffer();
+   assertNotNull(floatingBuffer2);
 
-   // Receive the producer's backlog
+   // Receive the producer's backlog less than the number 
of available floating buffers
inputChannel.onSenderBacklog(8);
 
-   // Request the number of floating buffers by the 
formula of backlog + initialCredit - availableBuffers
-   verify(bufferPool, times(8)).requestBuffer();
+   // Request the floating buffers to maintain (backlog + 
initialCredit) available buffers
+   verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool, 
times(0)).addBufferListener(inputChannel);
-   assertEquals("There should be 10 buffers available in 
the channel",
-   10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers available in 
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+   assertEquals("There should be 10 buffers required in 
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
 
-   inputChannel.onSenderBacklog(11);
+   // Increase the backlog to exceed the number of 
available floating buffers
+   inputChannel.onSenderBacklog(10);
 
-   // Need extra three floating buffers, but only two 
buffers available in buffer pool, register as listener as a 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265298#comment-16265298
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972315
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -729,6 +804,12 @@ public Void call() throws Exception {
};
}
 
+   /**
+* Submits all the callable tasks to the executor and waits the results.
--- End diff --

`waits for the results`


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"

2017-11-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265290#comment-16265290
 ] 

Aljoscha Krettek commented on FLINK-8093:
-

Do you mean thread safe or *not* thread safe? I think the static variable can 
be a problem because that would be used by multiple threads and the tasks from 
the two jobs share the same JVM instances.

> flink job fail because of kafka producer create fail of 
> "javax.management.InstanceAlreadyExistsException"
> -
>
> Key: FLINK-8093
> URL: https://issues.apache.org/jira/browse/FLINK-8093
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
> Environment: flink 1.3.2, kafka 0.9.1
>Reporter: dongtingting
>Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create 
> kafkaProducer fail,the reason for create kafkaProducer fail is 
> “javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace 
> is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from 
> RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
> at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.(RecordAccumulator.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:261)
> ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
> ... 16 more
> I doubt that task in different taskslot of one taskmanager use different 
> classloader, and taskid may be  the same in one process。 So this lead to 
> create kafkaProducer fail in one taskManager。 
> Does anybody encountered the same problem? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152972414
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
+   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   // Exhaust all the exclusive buffers
+   for (int i = 0; i < numExclusiveSegments; i++) {
+   Buffer buffer = inputChannel.requestBuffer();
+   assertNotNull(buffer);
+   exclusiveBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : exclusiveBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private Callable recycleFloatingBufferTask(BufferPool bufferPool, int 
numFloatingBuffers) throws Exception {
+   final List floatingBuffers = new 
ArrayList<>(numFloatingBuffers);
+   // Exhaust all the floating buffers
+   for (int i = 0; i < numFloatingBuffers; i++) {
+   Buffer buffer = bufferPool.requestBuffer();
+   assertNotNull(buffer);
+   floatingBuffers.add(buffer);
+   }
+
+   return new Callable() {
+   @Override
+   public Void call() throws Exception {
+   for (Buffer buffer : floatingBuffers) {
+   buffer.recycle();
+   }
+
+   return null;
+   }
+   };
+   }
+
+   private void submitTasksAndWaitResults(ExecutorService executor, 
Callable[] tasks) throws Exception {
--- End diff --

maybe also rename to `submitTasksAndWaitForResults`


---


[jira] [Updated] (FLINK-8147) Support and to CEP's pattern API

2017-11-24 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-8147:
--
Issue Type: New Feature  (was: Bug)

> Support and to CEP's pattern API
> 
>
> Key: FLINK-8147
> URL: https://issues.apache.org/jira/browse/FLINK-8147
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dian Fu
>
> Adding API such as {{and}} in CEP's pattern API will let us define patterns 
> like {{(A -> B) and (C -> D)}}.
> Its usage looks like
> {noformat}
> Pattern left = Pattern.begin("A").followedBy("B");
> Pattern right = Pattern.begin("C").followedBy("D");
> Pattern pattern = left.and(right)
> {noformat}
> If this makes sense, we should also support APIs such as {{or}} and {{not}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8147) Support and to CEP's pattern API

2017-11-24 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-8147:
--
Priority: Minor  (was: Major)

> Support and to CEP's pattern API
> 
>
> Key: FLINK-8147
> URL: https://issues.apache.org/jira/browse/FLINK-8147
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dian Fu
>Priority: Minor
>
> Adding API such as {{and}} in CEP's pattern API will let us define patterns 
> like {{(A -> B) and (C -> D)}}.
> Its usage looks like
> {noformat}
> Pattern left = Pattern.begin("A").followedBy("B");
> Pattern right = Pattern.begin("C").followedBy("D");
> Pattern pattern = left.and(right)
> {noformat}
> If this makes sense, we should also support APIs such as {{or}} and {{not}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6909) Flink should support Lombok POJO

2017-11-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265280#comment-16265280
 ] 

Timo Walther commented on FLINK-6909:
-

I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you 
check the logs for TypeExtractor entries? Is the class a member class of 
another class. Do you have the lombok IDE plugin installed and added the Maven 
dependency correctly?

> Flink should support Lombok POJO
> 
>
> Key: FLINK-6909
> URL: https://issues.apache.org/jira/browse/FLINK-6909
> Project: Flink
>  Issue Type: Wish
>  Components: Type Serialization System
>Reporter: Md Kamaruzzaman
>Assignee: Timo Walther
>Priority: Minor
>
> Project lombok helps greatly to reduce boilerplate Java Code. 
> It seems that Flink does not accept a lombok POJO as a valid pojo. 
> e.g. Here is a POJO defined with lombok:
> @Getter
> @Setter
> @NoArgsConstructor
> public class SimplePojo
> Using this Pojo class to read from CSV file throws this exception:
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> It would be great if flink supports lombok POJO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6909) Flink should support Lombok POJO

2017-11-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265280#comment-16265280
 ] 

Timo Walther edited comment on FLINK-6909 at 11/24/17 1:37 PM:
---

I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you 
check the logs for TypeExtractor entries? Is the class a member class of 
another class? Do you have the lombok IDE plugin installed and added the Maven 
dependency correctly?


was (Author: twalthr):
I tested a simple Lombok POJO and it worked as expected. [~mkzaman] Could you 
check the logs for TypeExtractor entries? Is the class a member class of 
another class. Do you have the lombok IDE plugin installed and added the Maven 
dependency correctly?

> Flink should support Lombok POJO
> 
>
> Key: FLINK-6909
> URL: https://issues.apache.org/jira/browse/FLINK-6909
> Project: Flink
>  Issue Type: Wish
>  Components: Type Serialization System
>Reporter: Md Kamaruzzaman
>Assignee: Timo Walther
>Priority: Minor
>
> Project lombok helps greatly to reduce boilerplate Java Code. 
> It seems that Flink does not accept a lombok POJO as a valid pojo. 
> e.g. Here is a POJO defined with lombok:
> @Getter
> @Setter
> @NoArgsConstructor
> public class SimplePojo
> Using this Pojo class to read from CSV file throws this exception:
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> It would be great if flink supports lombok POJO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8117) Eliminate modulo operation from RoundRobinChannelSelector and RebalancePartitioner

2017-11-24 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay resolved FLINK-8117.

Resolution: Implemented

> Eliminate modulo operation from RoundRobinChannelSelector and 
> RebalancePartitioner
> --
>
> Key: FLINK-8117
> URL: https://issues.apache.org/jira/browse/FLINK-8117
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
> Fix For: 1.5.0
>
>
> {{RoundRobinChannelSelector}}, {{RebalancePartitioner}}, and 
> {{RescalePartitioner}} use a modulo operation to wrap around when the current 
> channel counter reaches the number of channels. Using an {{if}} would have 
> better performance.
> A division with 32 bit operands is ~25 cycles on modern Intel CPUs \[1\], but 
> the {{if}} will be only 1-2 cycles on average, since the branch predictor can 
> most of the time predict the condition to be false.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265251#comment-16265251
 ] 

ASF GitHub Bot commented on FLINK-8144:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5063
  
I see. The bad performance is related to way that you generate the 
watermarks. Generating so many watermarks is very inefficient because you 
processing many more watermarks than actual records (watermarks are always 
broadcasted).
I'm not convinced that this is a scenario that we should optimize for.


> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5063
  
I see. The bad performance is related to way that you generate the 
watermarks. Generating so many watermarks is very inefficient because you 
processing many more watermarks than actual records (watermarks are always 
broadcasted).
I'm not convinced that this is a scenario that we should optimize for.


---


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265244#comment-16265244
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5065
  
Hi @Aegeaner, thanks for the pull request. 
I had a brief look at it and noticed a few things:

1. Please fill out the PR template (see #5063 as an example)
2. Please add unit tests that validate that the checks you added are 
working correctly. 
3. Please update the `equals()` and `hashCode()` methods of `Row` as 
mentioned in the JIRA issue.

Thank you, Fabian


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5065: [FLINK-8139][table] Check for proper equals() and hashCod...

2017-11-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5065
  
Hi @Aegeaner, thanks for the pull request. 
I had a brief look at it and noticed a few things:

1. Please fill out the PR template (see #5063 as an example)
2. Please add unit tests that validate that the checks you added are 
working correctly. 
3. Please update the `equals()` and `hashCode()` methods of `Row` as 
mentioned in the JIRA issue.

Thank you, Fabian


---


[jira] [Commented] (FLINK-8147) Support and to CEP's pattern API

2017-11-24 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265243#comment-16265243
 ] 

Dian Fu commented on FLINK-8147:


Any comments is welcome. Do you think it's useful? What do you think the API 
should look like?

> Support and to CEP's pattern API
> 
>
> Key: FLINK-8147
> URL: https://issues.apache.org/jira/browse/FLINK-8147
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>
> Adding API such as {{and}} in CEP's pattern API will let us define patterns 
> like {{(A -> B) and (C -> D)}}.
> Its usage looks like
> {noformat}
> Pattern left = Pattern.begin("A").followedBy("B");
> Pattern right = Pattern.begin("C").followedBy("D");
> Pattern pattern = left.and(right)
> {noformat}
> If this makes sense, we should also support APIs such as {{or}} and {{not}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265239#comment-16265239
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152961281
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

Sorry about the forth-and-back here, but thinking about the bug that you 
fixed with the latest commit, it would be dangerous if we ever called 
`maintainTargetSize()` without adding an exclusive buffer. What do you think 
about my second suggestion instead, i.e. having a
```
/**
 * Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
 * number of available buffers in queue is more than the 
required amount.
 *
 * @param buffer
 *  the exclusive buffer to add
 * @param numRequiredBuffers
 *  the number of required buffers
 *
 * @return how many buffers were added to the queue
 */
int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
Buffer floatingBuffer = floatingBuffers.poll();
floatingBuffer.recycle();
return 0;
} else {
return 1;
}
}
```
(please note the changed return type which I think is more obvious than a 
`boolean`)


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r152961281
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
@@ -572,20 +560,53 @@ void addExclusiveBuffer(Buffer buffer) {
exclusiveBuffers.add(buffer);
}
 
-   Buffer takeExclusiveBuffer() {
-   return exclusiveBuffers.poll();
-   }
-
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
 
-   Buffer takeFloatingBuffer() {
-   return floatingBuffers.poll();
+   /**
+* Add the exclusive buffer into the queue, and recycle one 
floating buffer if the
+* number of available buffers in queue is more than required 
amount.
+*
+* @param buffer The exclusive buffer of this channel.
+* @return Whether to recycle one floating buffer.
+*/
+   boolean maintainTargetSize(Buffer buffer) {
--- End diff --

Sorry about the forth-and-back here, but thinking about the bug that you 
fixed with the latest commit, it would be dangerous if we ever called 
`maintainTargetSize()` without adding an exclusive buffer. What do you think 
about my second suggestion instead, i.e. having a
```
/**
 * Adds an exclusive buffer (back) into the queue and recycles 
one floating buffer if the
 * number of available buffers in queue is more than the 
required amount.
 *
 * @param buffer
 *  the exclusive buffer to add
 * @param numRequiredBuffers
 *  the number of required buffers
 *
 * @return how many buffers were added to the queue
 */
int addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
Buffer floatingBuffer = floatingBuffers.poll();
floatingBuffer.recycle();
return 0;
} else {
return 1;
}
}
```
(please note the changed return type which I think is more obvious than a 
`boolean`)


---


[jira] [Created] (FLINK-8147) Support and to CEP's pattern API

2017-11-24 Thread Dian Fu (JIRA)
Dian Fu created FLINK-8147:
--

 Summary: Support and to CEP's pattern API
 Key: FLINK-8147
 URL: https://issues.apache.org/jira/browse/FLINK-8147
 Project: Flink
  Issue Type: Bug
  Components: CEP
Reporter: Dian Fu


Adding API such as {{and}} in CEP's pattern API will let us define patterns 
like {{(A -> B) and (C -> D)}}.
Its usage looks like
{noformat}
Pattern left = Pattern.begin("A").followedBy("B");
Pattern right = Pattern.begin("C").followedBy("D");
Pattern pattern = left.and(right)
{noformat}
If this makes sense, we should also support APIs such as {{or}} and {{not}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265231#comment-16265231
 ] 

ASF GitHub Bot commented on FLINK-8125:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5059


> Support limiting the number of open FileSystem connections
> --
>
> Key: FLINK-8125
> URL: https://issues.apache.org/jira/browse/FLINK-8125
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems 
> concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large 
> Flink job trying to build up many connections during a checkpoint causes 
> failures due to rejected connections. 
> I propose to add a file system that can wrap another existing file system The 
> file system may track the progress of streams and close streams that have 
> been inactive for too long, to avoid locked streams of taking up the complete 
> pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5059: [FLINK-8125] [core] Introduce limiting of file sys...

2017-11-24 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5059


---


[jira] [Resolved] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8125.
-
Resolution: Fixed

Fixed in
  - 1.5.0 via d7c2c417213502130b1aeab1868313df178555cc
  - 1.4.1 via a11e2cf0b1f37d3ef22e1978e89928fa374960db

> Support limiting the number of open FileSystem connections
> --
>
> Key: FLINK-8125
> URL: https://issues.apache.org/jira/browse/FLINK-8125
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems 
> concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large 
> Flink job trying to build up many connections during a checkpoint causes 
> failures due to rejected connections. 
> I propose to add a file system that can wrap another existing file system The 
> file system may track the progress of streams and close streams that have 
> been inactive for too long, to avoid locked streams of taking up the complete 
> pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8125) Support limiting the number of open FileSystem connections

2017-11-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8125.
---

> Support limiting the number of open FileSystem connections
> --
>
> Key: FLINK-8125
> URL: https://issues.apache.org/jira/browse/FLINK-8125
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.5.0, 1.4.1
>
>
> We need a way to limit the number of streams that Flink FileSystems 
> concurrently open.
> For example, for very small HDFS clusters with few RPC handlers, a large 
> Flink job trying to build up many connections during a checkpoint causes 
> failures due to rejected connections. 
> I propose to add a file system that can wrap another existing file system The 
> file system may track the progress of streams and close streams that have 
> been inactive for too long, to avoid locked streams of taking up the complete 
> pool.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8146) Potential resource leak in PythonPlanBinder#unzipPythonLibrary

2017-11-24 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8146:
-

 Summary: Potential resource leak in 
PythonPlanBinder#unzipPythonLibrary
 Key: FLINK-8146
 URL: https://issues.apache.org/jira/browse/FLINK-8146
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
while (entry != null) {
...
}
zis.closeEntry();
{code}
Looking at the catch block inside the loop, it seems the intention is to close 
zis upon getting exception.
zis.close() should be called outside the loop.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265213#comment-16265213
 ] 

ASF GitHub Bot commented on FLINK-8144:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5063#discussion_r152955709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 ---
@@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver(
 // discard late record
 if (timestamp > curWatermark) {
   // ensure every key just registers one timer
-  ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+  ctx.timerService.registerEventTimeTimer(timestamp)
--- End diff --

@fhueske Thanks a lot for your comments. Your concern makes sense to me. I 
think the current implementation is ok under periodic watermark. But I'm not 
sure if it's optimal under punctuated watermark. We will perform some 
performance test for unbounded over under punctuated watermark and share the 
results.


> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...

2017-11-24 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5063#discussion_r152955709
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 ---
@@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver(
 // discard late record
 if (timestamp > curWatermark) {
   // ensure every key just registers one timer
-  ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+  ctx.timerService.registerEventTimeTimer(timestamp)
--- End diff --

@fhueske Thanks a lot for your comments. Your concern makes sense to me. I 
think the current implementation is ok under periodic watermark. But I'm not 
sure if it's optimal under punctuated watermark. We will perform some 
performance test for unbounded over under punctuated watermark and share the 
results.


---


[GitHub] flink pull request #5055: [FLINK-7718] [flip6] Add JobVertexMetricsHandler t...

2017-11-24 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5055#discussion_r152951899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * @param  Type of the concrete MessageParameters
+ */
+public abstract class AbstractMetricsHandler 
extends
+   AbstractRestHandler {
+
+   private final MetricFetcher metricFetcher;
+
+   public AbstractMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever leaderRetriever,
+   Time timeout,
+   Map headers,
+   MessageHeaders messageHeaders,
+   MetricFetcher metricFetcher) {
+   super(localRestAddress, leaderRetriever, timeout, headers, 
messageHeaders);
+   this.metricFetcher = requireNonNull(metricFetcher, 
"metricFetcher must not be null");
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull HandlerRequest request,
+   @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   metricFetcher.update();
+
+   final MetricStore.ComponentMetricStore componentMetricStore = 
getComponentMetricStore(
+   request,
+   metricFetcher.getMetricStore());
+
+   if (componentMetricStore == null || 
componentMetricStore.metrics == null) {
+   return 

[jira] [Commented] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265202#comment-16265202
 ] 

ASF GitHub Bot commented on FLINK-7718:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5055#discussion_r152951899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandler.java
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of 
all metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
+ * {@code /metrics?get=X,Y}
+ * The handler will then return a list containing the values of the 
requested metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
+ *
+ * @param  Type of the concrete MessageParameters
+ */
+public abstract class AbstractMetricsHandler 
extends
+   AbstractRestHandler {
+
+   private final MetricFetcher metricFetcher;
+
+   public AbstractMetricsHandler(
+   CompletableFuture localRestAddress,
+   GatewayRetriever leaderRetriever,
+   Time timeout,
+   Map headers,
+   MessageHeaders messageHeaders,
+   MetricFetcher metricFetcher) {
+   super(localRestAddress, leaderRetriever, timeout, headers, 
messageHeaders);
+   this.metricFetcher = requireNonNull(metricFetcher, 
"metricFetcher must not be null");
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull HandlerRequest request,
+   @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
+   metricFetcher.update();
+
+   final MetricStore.ComponentMetricStore componentMetricStore = 
getComponentMetricStore(
+

[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265184#comment-16265184
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

GitHub user Aegeaner opened a pull request:

https://github.com/apache/flink/pull/5065

[FLINK-8139][table] Check for proper equals() and hashCode() when reg…

https://issues.apache.org/jira/browse/FLINK-8139

Check for proper equals() and hashCode() when registering a table.

## Contribution Checklist


## Verifying this change

flink-table unit tests passed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aegeaner/flink FLINK-8139

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5065.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5065


commit 41e79898653c656ebca3b4d96bab6f2b4664488d
Author: Aegeaner 
Date:   2017-11-24T08:11:28Z

[FLINK-8139][table] Check for proper equals() and hashCode() when 
registering a table




> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-11-24 Thread Aegeaner
GitHub user Aegeaner opened a pull request:

https://github.com/apache/flink/pull/5065

[FLINK-8139][table] Check for proper equals() and hashCode() when reg…

https://issues.apache.org/jira/browse/FLINK-8139

Check for proper equals() and hashCode() when registering a table.

## Contribution Checklist


## Verifying this change

flink-table unit tests passed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aegeaner/flink FLINK-8139

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5065.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5065


commit 41e79898653c656ebca3b4d96bab6f2b4664488d
Author: Aegeaner 
Date:   2017-11-24T08:11:28Z

[FLINK-8139][table] Check for proper equals() and hashCode() when 
registering a table




---


[jira] [Commented] (FLINK-8145) IOManagerAsync not properly shut down in various tests

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265165#comment-16265165
 ] 

ASF GitHub Bot commented on FLINK-8145:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5064

[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

## What is the purpose of the change

Fix several unit tests using `IOManagerAsync` but not shutting it down 
afterwards (not even in the failure-free case). The only thing cleaning them up 
seems to be a shutdown handler attached to the JVM. Since each IOManagerAsync 
is spawning a number of reader and writer threads, this may put a significant 
burden on the tests.

## Brief change log

- add shutdown handlers via `@AfterClass` to 
`AsynchronousBufferFileWriterTest`, `BufferFileWriterFileSegmentReaderTest`, 
`BufferFileWriterReaderTest`, `HashTablePerformanceComparison`, 
`FixedLengthRecordSorterTest`
- let `HashTableITCase`, `AbstractSortMergeOuterJoinIteratorITCase` test 
methods use the static `ioManager` instead of creating new instances in some 
test methods
- clean up manually in `HashTableTest`, `MassiveStringSorting`, 
`MassiveStringValueSorting` (similar cleanup for the `MemoryManager` there)

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8145

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5064.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5064


commit 7a0723b90fcefb8bf9c6e448e24db43758d0a1e2
Author: Nico Kruber 
Date:   2017-11-24T10:31:48Z

[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down




> IOManagerAsync not properly shut down in various tests
> --
>
> Key: FLINK-8145
> URL: https://issues.apache.org/jira/browse/FLINK-8145
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In various tests, e.g. {{AsynchronousBufferFileWriterTest}}, 
> {{BufferFileWriterReaderTest}}, or {{HashTableITCase}}, {{IOManagerAsync}} 
> instances are used which are not shut down at all. The only thing cleaning 
> them up seems to be a shutdown handler attached to the JVM. Since each 
> {{IOManagerAsync}} is spawning a number of reader and writer threads, this 
> may put a significant burden on the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5064: [FLINK-8145][tests] fix various IOManagerAsync ins...

2017-11-24 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5064

[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

## What is the purpose of the change

Fix several unit tests using `IOManagerAsync` but not shutting it down 
afterwards (not even in the failure-free case). The only thing cleaning them up 
seems to be a shutdown handler attached to the JVM. Since each IOManagerAsync 
is spawning a number of reader and writer threads, this may put a significant 
burden on the tests.

## Brief change log

- add shutdown handlers via `@AfterClass` to 
`AsynchronousBufferFileWriterTest`, `BufferFileWriterFileSegmentReaderTest`, 
`BufferFileWriterReaderTest`, `HashTablePerformanceComparison`, 
`FixedLengthRecordSorterTest`
- let `HashTableITCase`, `AbstractSortMergeOuterJoinIteratorITCase` test 
methods use the static `ioManager` instead of creating new instances in some 
test methods
- clean up manually in `HashTableTest`, `MassiveStringSorting`, 
`MassiveStringValueSorting` (similar cleanup for the `MemoryManager` there)

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8145

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5064.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5064


commit 7a0723b90fcefb8bf9c6e448e24db43758d0a1e2
Author: Nico Kruber 
Date:   2017-11-24T10:31:48Z

[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down




---


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265155#comment-16265155
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152944166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

I don't have (too) hard feelings about it, so there it is - along with some 
more minor fixes in the tests and one additional test for 
`AsynchronousBufferFileWriter` itself (the change there was not covered yet)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4581: [FLINK-7499][io] fix double buffer release in Spil...

2017-11-24 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152944166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

I don't have (too) hard feelings about it, so there it is - along with some 
more minor fixes in the tests and one additional test for 
`AsynchronousBufferFileWriter` itself (the change there was not covered yet)


---


[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265143#comment-16265143
 ] 

ASF GitHub Bot commented on FLINK-5465:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5058
  
I was thinking about where to put those options and after all it was a 
choice between fragmentation and inconsistency. I considered 
`TaskManagerOptions` but it sounds like a wrong place, if you look what the 
other options in this class are about and it also feels like saying `Task` == 
`StreamTask` because timers are something that currently belongs to `streaming` 
imo. There are classes for configurations all over the place, so if you think 
this is a problem, maybe we should fix it in general and not on a per-case 
basis?

I agree that we can change the key string.


> RocksDB fails with segfault while calling AbstractRocksDBState.clear()
> --
>
> Key: FLINK-5465
> URL: https://issues.apache.org/jira/browse/FLINK-5465
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
> Fix For: 1.4.0, 1.5.0
>
> Attachments: hs-err-pid26662.log
>
>
> I'm using Flink 699f4b0.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [librocksdbjni-linux64.so+0x1aeb78]  
> rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # 
> /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log
> Compiled method (nm) 1869778  903 n   org.rocksdb.RocksDB::remove 
> (native)
>  total in heap  [0x7f91b40b9dd0,0x7f91b40ba150] = 896
>  relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88
>  main code  [0x7f91b40b9f60,0x7f91b40ba150] = 496
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads t...

2017-11-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5058
  
I was thinking about where to put those options and after all it was a 
choice between fragmentation and inconsistency. I considered 
`TaskManagerOptions` but it sounds like a wrong place, if you look what the 
other options in this class are about and it also feels like saying `Task` == 
`StreamTask` because timers are something that currently belongs to `streaming` 
imo. There are classes for configurations all over the place, so if you think 
this is a problem, maybe we should fix it in general and not on a per-case 
basis?

I agree that we can change the key string.


---


[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265138#comment-16265138
 ] 

ASF GitHub Bot commented on FLINK-5465:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5058
  
I would like to make a few comments for followup:

  - I think `TimerServiceOptions` should not be an own class, we are 
getting a crazy fragmentation of options into classes (that have sometimes, 
like here, only one option defined). Let's merge these into the 
`TaskManagerOptions`.

  - The config key does not reflect the scheme in which all other config 
keys are defined. It reads like a name where the dot '.' is a work separator. 
The scheme in which all other config keys are defined is hierarchical, like a 
path in a nested config group/object structure. Think that the configuration is 
one huge JSON object, and the key is the dot path to the entry. Hence a key 
like `taskmanager.timers.shutdown-timeout` (or 
`taskmanager.timers.shutdown.timeout`, if we view `shutdown` as a full config 
group/object) would be in line with the resulting style.



> RocksDB fails with segfault while calling AbstractRocksDBState.clear()
> --
>
> Key: FLINK-5465
> URL: https://issues.apache.org/jira/browse/FLINK-5465
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
> Fix For: 1.4.0, 1.5.0
>
> Attachments: hs-err-pid26662.log
>
>
> I'm using Flink 699f4b0.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [librocksdbjni-linux64.so+0x1aeb78]  
> rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # 
> /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log
> Compiled method (nm) 1869778  903 n   org.rocksdb.RocksDB::remove 
> (native)
>  total in heap  [0x7f91b40b9dd0,0x7f91b40ba150] = 896
>  relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88
>  main code  [0x7f91b40b9f60,0x7f91b40ba150] = 496
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5058: [FLINK-5465] [streaming] Wait for pending timer threads t...

2017-11-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5058
  
I would like to make a few comments for followup:

  - I think `TimerServiceOptions` should not be an own class, we are 
getting a crazy fragmentation of options into classes (that have sometimes, 
like here, only one option defined). Let's merge these into the 
`TaskManagerOptions`.

  - The config key does not reflect the scheme in which all other config 
keys are defined. It reads like a name where the dot '.' is a work separator. 
The scheme in which all other config keys are defined is hierarchical, like a 
path in a nested config group/object structure. Think that the configuration is 
one huge JSON object, and the key is the dot path to the entry. Hence a key 
like `taskmanager.timers.shutdown-timeout` (or 
`taskmanager.timers.shutdown.timeout`, if we view `shutdown` as a full config 
group/object) would be in line with the resulting style.



---


[jira] [Issue Comment Deleted] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-8144:
---
Comment: was deleted

(was: {{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend after 
optimizing the timer logic. I think the optimization should also apply to 
{{RowTimeUnboundedOver}}.

BTW: the watermark we use in the CEP use case is 
{{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every 
input element.)

> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...

2017-11-24 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend after 
optimizing the timer logic. I think the optimization should also apply to 
{{RowTimeUnboundedOver}}.

BTW: the watermark we use in the CEP use case is 
{{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every 
input element.


---


[jira] [Comment Edited] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265130#comment-16265130
 ] 

Dian Fu edited comment on FLINK-8144 at 11/24/17 10:19 AM:
---

{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend after 
optimizing the timer logic. I think the optimization should also apply to 
{{RowTimeUnboundedOver}}.

BTW: the watermark we use in the CEP use case is 
{{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every 
input element.


was (Author: dian.fu):
{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend. I think 
the optimization should also apply to {{RowTimeUnboundedOver}}.

> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265133#comment-16265133
 ] 

ASF GitHub Bot commented on FLINK-8144:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/5063
  
{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend after 
optimizing the timer logic. I think the optimization should also apply to 
{{RowTimeUnboundedOver}}.

BTW: the watermark we use in the CEP use case is 
{{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every 
input element.


> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265130#comment-16265130
 ] 

Dian Fu commented on FLINK-8144:


{{AbstractKeyedCEPPatternOperator}} has similar logic as 
{{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the 
performance is very bad under the current logic for 
{{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps 
to about 3500 tps for one operator in the case of RocksDBStateBackend. I think 
the optimization should also apply to {{RowTimeUnboundedOver}}.

> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8145) IOManagerAsync not properly shut down in various tests

2017-11-24 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-8145:
--

 Summary: IOManagerAsync not properly shut down in various tests
 Key: FLINK-8145
 URL: https://issues.apache.org/jira/browse/FLINK-8145
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber


In various tests, e.g. {{AsynchronousBufferFileWriterTest}}, 
{{BufferFileWriterReaderTest}}, or {{HashTableITCase}}, {{IOManagerAsync}} 
instances are used which are not shut down at all. The only thing cleaning them 
up seems to be a shutdown handler attached to the JVM. Since each 
{{IOManagerAsync}} is spawning a number of reader and writer threads, this may 
put a significant burden on the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6909) Flink should support Lombok POJO

2017-11-24 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-6909:
---

Assignee: Timo Walther

> Flink should support Lombok POJO
> 
>
> Key: FLINK-6909
> URL: https://issues.apache.org/jira/browse/FLINK-6909
> Project: Flink
>  Issue Type: Wish
>  Components: Type Serialization System
>Reporter: Md Kamaruzzaman
>Assignee: Timo Walther
>Priority: Minor
>
> Project lombok helps greatly to reduce boilerplate Java Code. 
> It seems that Flink does not accept a lombok POJO as a valid pojo. 
> e.g. Here is a POJO defined with lombok:
> @Getter
> @Setter
> @NoArgsConstructor
> public class SimplePojo
> Using this Pojo class to read from CSV file throws this exception:
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> It would be great if flink supports lombok POJO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6909) Flink should support Lombok POJO

2017-11-24 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265118#comment-16265118
 ] 

Timo Walther commented on FLINK-6909:
-

This looks like a bug to me. I will assign the issue to me and have a look at 
it.

> Flink should support Lombok POJO
> 
>
> Key: FLINK-6909
> URL: https://issues.apache.org/jira/browse/FLINK-6909
> Project: Flink
>  Issue Type: Wish
>  Components: Type Serialization System
>Reporter: Md Kamaruzzaman
>Priority: Minor
>
> Project lombok helps greatly to reduce boilerplate Java Code. 
> It seems that Flink does not accept a lombok POJO as a valid pojo. 
> e.g. Here is a POJO defined with lombok:
> @Getter
> @Setter
> @NoArgsConstructor
> public class SimplePojo
> Using this Pojo class to read from CSV file throws this exception:
> Exception in thread "main" java.lang.ClassCastException: 
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to 
> org.apache.flink.api.java.typeutils.PojoTypeInfo
> It would be great if flink supports lombok POJO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7316) always use off-heap network buffers

2017-11-24 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-7316.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Merged in 1854a3de19.

> always use off-heap network buffers
> ---
>
> Key: FLINK-7316
> URL: https://issues.apache.org/jira/browse/FLINK-7316
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.5.0
>
>
> In order to send flink buffers through netty into the network, we need to 
> make the buffers use off-heap memory. Otherwise, there will be a hidden copy 
> happening in the NIO stack.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7316) always use off-heap network buffers

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265104#comment-16265104
 ] 

ASF GitHub Bot commented on FLINK-7316:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4481


> always use off-heap network buffers
> ---
>
> Key: FLINK-7316
> URL: https://issues.apache.org/jira/browse/FLINK-7316
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to send flink buffers through netty into the network, we need to 
> make the buffers use off-heap memory. Otherwise, there will be a hidden copy 
> happening in the NIO stack.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4481: [FLINK-7316][network] always use off-heap network ...

2017-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4481


---


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265097#comment-16265097
 ] 

ASF GitHub Bot commented on FLINK-8144:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5063#discussion_r152929249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 ---
@@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver(
 // discard late record
 if (timestamp > curWatermark) {
   // ensure every key just registers one timer
-  ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+  ctx.timerService.registerEventTimeTimer(timestamp)
--- End diff --

Registering multiple timers on the same timestamp (`curWatermark + 1`), 
means that only one timer exists that fires exactly once when a watermark is 
received.

By registering timers on different timestamps, we have many timers that all 
will fire when a watermark is received. I don't think this is what we want.


> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...

2017-11-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5063#discussion_r152929249
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 ---
@@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver(
 // discard late record
 if (timestamp > curWatermark) {
   // ensure every key just registers one timer
-  ctx.timerService.registerEventTimeTimer(curWatermark + 1)
+  ctx.timerService.registerEventTimeTimer(timestamp)
--- End diff --

Registering multiple timers on the same timestamp (`curWatermark + 1`), 
means that only one timer exists that fires exactly once when a watermark is 
received.

By registering timers on different timestamps, we have many timers that all 
will fire when a watermark is received. I don't think this is what we want.


---


[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers

2017-11-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4481
  
Will merge this now.


---


[jira] [Commented] (FLINK-7316) always use off-heap network buffers

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265093#comment-16265093
 ] 

ASF GitHub Bot commented on FLINK-7316:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4481
  
LGTM  


> always use off-heap network buffers
> ---
>
> Key: FLINK-7316
> URL: https://issues.apache.org/jira/browse/FLINK-7316
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to send flink buffers through netty into the network, we need to 
> make the buffers use off-heap memory. Otherwise, there will be a hidden copy 
> happening in the NIO stack.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7316) always use off-heap network buffers

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265094#comment-16265094
 ] 

ASF GitHub Bot commented on FLINK-7316:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4481
  
Will merge this now.


> always use off-heap network buffers
> ---
>
> Key: FLINK-7316
> URL: https://issues.apache.org/jira/browse/FLINK-7316
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> In order to send flink buffers through netty into the network, we need to 
> make the buffers use off-heap memory. Otherwise, there will be a hidden copy 
> happening in the NIO stack.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4481: [FLINK-7316][network] always use off-heap network buffers

2017-11-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4481
  
LGTM 👍 


---


[jira] [Commented] (FLINK-8144) Optimize the timer logic in RowTimeUnboundedOver

2017-11-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265088#comment-16265088
 ] 

Fabian Hueske commented on FLINK-8144:
--

Did you observe this behavior in practice, i.e., while debugging, or is that an 
assumption based on reading the code?

I'm asking, because a timer that is registered on {{current_watermark + 1}} 
fires just once when the next watermark is received (the logical clock is only 
advanced by watermarks).
Also by registering multiple timers on the same timestamp, the timer gets 
overridden, so there will be only one timer that fires.

I think the current implementation should behave just as expected, i.e., just 
go once over the MapState when a watermark is received.

> Optimize the timer logic in RowTimeUnboundedOver
> 
>
> Key: FLINK-8144
> URL: https://issues.apache.org/jira/browse/FLINK-8144
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> Currently the logic of {{RowTimeUnboundedOver}} is as follows:
> 1) When element comes, buffer it in MapState and and register a timer at 
> {{current watermark + 1}}
> 2) When event timer triggered, scan the MapState and find the elements below 
> the current watermark and process it. If there are remaining elements to 
> process, register a new timer at {{current watermark + 1}}.
> Let's assume that watermark comes about 5 seconds later than the event on 
> average, then we will scan about 5000 times the MapState before actually 
> processing the events.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265082#comment-16265082
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152923905
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
 ---
@@ -19,50 +19,47 @@ package org.apache.flink.table.codegen
 
 import org.apache.flink.api.common.io.GenericInputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.newName
 import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.types.Row
 
 /**
   * A code generator for generating Flink [[GenericInputFormat]]s.
-  *
-  * @param config configuration that determines runtime behavior
   */
-class InputFormatCodeGenerator(
-config: TableConfig)
-  extends CodeGenerator(config, false, new RowTypeInfo(), None, None) {
-
+object InputFormatCodeGenerator {
 
   /**
 * Generates a values input format that can be passed to Java compiler.
 *
+* @param ctx The code generator context
 * @param name Class name of the input format. Must not be unique but 
has to be a
 * valid Java class identifier.
 * @param records code for creating records
 * @param returnType expected return type
+* @param outRecordTerm term of the output
 * @tparam T Return type of the Flink Function.
 * @return instance of GeneratedFunction
 */
   def generateValuesInputFormat[T <: Row](
-name: String,
-records: Seq[String],
-returnType: TypeInformation[T])
-  : GeneratedInput[GenericInputFormat[T], T] = {
+  ctx: CodeGeneratorContext,
+  name: String,
+  records: Seq[String],
+  returnType: TypeInformation[T],
+  outRecordTerm: String = CodeGeneratorContext.DEFAULT_OUT_RECORD_TERM)
--- End diff --

Do we need this parameter?


> Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
> ---
>
> Key: FLINK-7959
> URL: https://issues.apache.org/jira/browse/FLINK-7959
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> Right now {{CodeGenerator}} actually acts two roles, one is responsible for 
> generating codes from RexNode, and the other one is keeping lots of reusable 
> statements. It makes more sense to split these logic into two dedicated 
> classes. 
> The new {{CodeGeneratorContext}} will keep all the reusable statements, while 
> the new {{ExprCodeGenerator}} will only do generating codes from RexNode.
> And for classes like {{AggregationCodeGenerator}} or 
> {{FunctionCodeGenerator}}, I think the should not be the subclasses of the 
> {{CodeGenerator}}, but should all as standalone classes. They can create 
> {{ExprCodeGenerator}} when they need to generating codes from RexNode, and 
> they can also generating codes by themselves. The {{CodeGeneratorContext}} 
> can be passed around to collect all reusable statements, and list them in the 
> final generated class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265080#comment-16265080
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152923370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
 ---
@@ -21,75 +21,49 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
 import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
 import org.apache.flink.table.codegen.Indenter.toISC
 
 /**
   * A code generator for generating Flink 
[[org.apache.flink.api.common.functions.Function]]s.
-  * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], 
[[ProcessFunction]].
-  *
-  * @param config configuration that determines runtime behavior
-  * @param nullableInput input(s) can be null.
-  * @param input1 type information about the first input of the Function
-  * @param input2 type information about the second input if the Function 
is binary
-  * @param input1FieldMapping additional mapping information for input1
-  *   (e.g. POJO types have no deterministic field order and some input 
fields might not be read)
-  * @param input2FieldMapping additional mapping information for input2
-  *   (e.g. POJO types have no deterministic field order and some input 
fields might not be read)
+  * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], 
[[ProcessFunction]], and
+  * the corresponding rich version of the functions.
   */
-class FunctionCodeGenerator(
-config: TableConfig,
-nullableInput: Boolean,
-input1: TypeInformation[_ <: Any],
-input2: Option[TypeInformation[_ <: Any]] = None,
-input1FieldMapping: Option[Array[Int]] = None,
-input2FieldMapping: Option[Array[Int]] = None)
-  extends CodeGenerator(
-config,
-nullableInput,
-input1,
-input2,
-input1FieldMapping,
-input2FieldMapping) {
-
-  /**
-* A code generator for generating unary Flink
-* [[org.apache.flink.api.common.functions.Function]]s with one input.
-*
-* @param config configuration that determines runtime behavior
-* @param nullableInput input(s) can be null.
-* @param input type information about the input of the Function
-* @param inputFieldMapping additional mapping information necessary 
for input
-*   (e.g. POJO types have no deterministic field order and some input 
fields might not be read)
-*/
-  def this(
-config: TableConfig,
-nullableInput: Boolean,
-input: TypeInformation[Any],
-inputFieldMapping: Array[Int]) =
-this(config, nullableInput, input, None, Some(inputFieldMapping))
+object FunctionCodeGenerator {
 
   /**
 * Generates a [[org.apache.flink.api.common.functions.Function]] that 
can be passed to Java
 * compiler.
 *
+* @param ctx The context of the code generator
 * @param name Class name of the Function. Must not be unique but has 
to be a valid Java class
 * identifier.
 * @param clazz Flink Function to be generated.
 * @param bodyCode code contents of the SAM (Single Abstract Method). 
Inputs, collector, or
 * output record can be accessed via the given term 
methods.
 * @param returnType expected return type
+* @param input1Type the first input type
+* @param input1Term the first input term
+* @param input2Type the second input type, optional.
+* @param input2Term the second input term.
+* @param collectorTerm the collector term
+* @param contextTerm the context term
 * @tparam F Flink Function to be generated.
 * @tparam T Return type of the Flink Function.
 * @return instance of GeneratedFunction
 */
   def generateFunction[F <: Function, T <: Any](
-name: String,
-clazz: Class[F],
-bodyCode: String,
-returnType: TypeInformation[T])
-  : GeneratedFunction[F, T] = {
+  ctx: CodeGeneratorContext,
+  name: String,
+  clazz: Class[F],
+  bodyCode: String,
+  returnType: TypeInformation[T],
+  input1Type: TypeInformation[_ <: Any],
--- End diff --

That's a lot of parameters. We should really think about moving input 
related information to the 

[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265079#comment-16265079
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152921238
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 ---
@@ -203,6 +210,569 @@ object CodeGenUtils {
   throw new CodeGenException("Integer expression type expected.")
 }
 
+  def generateNullLiteral(
--- End diff --

Actually these methods should be part of the `ExprCodeGenerator`. They are 
more than just utils but the actual main logic.


> Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
> ---
>
> Key: FLINK-7959
> URL: https://issues.apache.org/jira/browse/FLINK-7959
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> Right now {{CodeGenerator}} actually acts two roles, one is responsible for 
> generating codes from RexNode, and the other one is keeping lots of reusable 
> statements. It makes more sense to split these logic into two dedicated 
> classes. 
> The new {{CodeGeneratorContext}} will keep all the reusable statements, while 
> the new {{ExprCodeGenerator}} will only do generating codes from RexNode.
> And for classes like {{AggregationCodeGenerator}} or 
> {{FunctionCodeGenerator}}, I think the should not be the subclasses of the 
> {{CodeGenerator}}, but should all as standalone classes. They can create 
> {{ExprCodeGenerator}} when they need to generating codes from RexNode, and 
> they can also generating codes by themselves. The {{CodeGeneratorContext}} 
> can be passed around to collect all reusable statements, and list them in the 
> final generated class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265083#comment-16265083
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152921868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
 ---
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.math.{BigDecimal => JBigDecimal}
+
+import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import 
org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.functions.{FunctionContext, 
UserDefinedFunction}
+
+import scala.collection.mutable
+
+/**
+ * The context for code generator, maintaining various reusable statements 
that could be insert
+ * into different code sections in the final generated class.
+ */
+class CodeGeneratorContext {
+
+  // set of member statements that will be added only once
+  private val reusableMemberStatements: mutable.LinkedHashSet[String] =
+mutable.LinkedHashSet[String]()
+
+  // set of constructor statements that will be added only once
+  private val reusableInitStatements: mutable.LinkedHashSet[String] =
+mutable.LinkedHashSet[String]()
+
+  // set of statements that will be added only once per record
+  private val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
+mutable.LinkedHashSet[String]()
+
+  // set of open statements for RichFunction that will be added only once
+  private val reusableOpenStatements: mutable.LinkedHashSet[String] =
+mutable.LinkedHashSet[String]()
+
+  // set of close statements for RichFunction that will be added only once
+  private val reusableCloseStatements: mutable.LinkedHashSet[String] =
+mutable.LinkedHashSet[String]()
+
+  // map of initial input unboxing expressions that will be added only once
+  // (inputTerm, index) -> expr
+  private val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
+mutable.Map[(String, Int), GeneratedExpression]()
+
+  // set of constructor statements that will be added only once
+  private val reusableConstructorStatements: 
mutable.LinkedHashSet[(String, String)] =
+mutable.LinkedHashSet[(String, String)]()
+
+  /**
+   * @return code block of statements that need to be placed in the member 
area of the class
+   * (e.g. member variables and their initialization)
+   */
+  def reuseMemberCode(): String = {
+reusableMemberStatements.mkString("")
+  }
+
+  /**
+   * @return code block of statements that need to be placed in the 
constructor
+   */
+  def reuseInitCode(): String = {
+reusableInitStatements.mkString("")
+  }
+
+  /**
+   * @return code block of statements that need to be placed in the per 
recode process block
+   * (e.g. Function or StreamOperator's processElement)
+   */
+  def reusePerRecordCode(): String = {
+reusablePerRecordStatements.mkString("")
+  }
+
+  /**
+   * @return code block of statements that need to be placed in the open() 
method
+   * (e.g. RichFunction or StreamOperator)
+   */
+  def reuseOpenCode(): String = {
+reusableOpenStatements.mkString("")
+  }
+
+  /**
+   * @return code block of statements that need to be placed in the 
close() method
+   * (e.g. RichFunction or StreamOperator)
+   */
  

[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265081#comment-16265081
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152924498
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CurrentTimePointCallGen.scala
 ---
@@ -31,28 +32,28 @@ class CurrentTimePointCallGen(
   extends CallGenerator {
 
   override def generate(
-  codeGenerator: CodeGenerator,
-  operands: Seq[GeneratedExpression])
-: GeneratedExpression = targetType match {
+  ctx: CodeGeneratorContext,
--- End diff --

Use `ExprCodeGenerator` here such that we don't need to pass `nullCheck` to 
every util function, if those functions are part of `ExprCodeGenerator` class. 
With this approach we don't need to add `nullCheck` to `CallGenerate.generate`.


> Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
> ---
>
> Key: FLINK-7959
> URL: https://issues.apache.org/jira/browse/FLINK-7959
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> Right now {{CodeGenerator}} actually acts two roles, one is responsible for 
> generating codes from RexNode, and the other one is keeping lots of reusable 
> statements. It makes more sense to split these logic into two dedicated 
> classes. 
> The new {{CodeGeneratorContext}} will keep all the reusable statements, while 
> the new {{ExprCodeGenerator}} will only do generating codes from RexNode.
> And for classes like {{AggregationCodeGenerator}} or 
> {{FunctionCodeGenerator}}, I think the should not be the subclasses of the 
> {{CodeGenerator}}, but should all as standalone classes. They can create 
> {{ExprCodeGenerator}} when they need to generating codes from RexNode, and 
> they can also generating codes by themselves. The {{CodeGeneratorContext}} 
> can be passed around to collect all reusable statements, and list them in the 
> final generated class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7959) Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265077#comment-16265077
 ] 

ASF GitHub Bot commented on FLINK-7959:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4940#discussion_r152918458
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -985,14 +985,11 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 }
 
 // code generate MapFunction
-val generator = new FunctionCodeGenerator(
-  config,
-  false,
-  inputTypeInfo,
-  None,
-  None)
-
-val conversion = generator.generateConverterResultExpression(
+val ctx = CodeGeneratorContext()
+val exprGenerator = new ExprCodeGenerator(ctx, false, 
config.getNullCheck)
--- End diff --

Should we move the `nullableInput` to `bindInput`. If an input is not 
required why should this flag be required? We can also add an default `false` 
value.


> Split CodeGenerator into CodeGeneratorContext and ExprCodeGenerator
> ---
>
> Key: FLINK-7959
> URL: https://issues.apache.org/jira/browse/FLINK-7959
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> Right now {{CodeGenerator}} actually acts two roles, one is responsible for 
> generating codes from RexNode, and the other one is keeping lots of reusable 
> statements. It makes more sense to split these logic into two dedicated 
> classes. 
> The new {{CodeGeneratorContext}} will keep all the reusable statements, while 
> the new {{ExprCodeGenerator}} will only do generating codes from RexNode.
> And for classes like {{AggregationCodeGenerator}} or 
> {{FunctionCodeGenerator}}, I think the should not be the subclasses of the 
> {{CodeGenerator}}, but should all as standalone classes. They can create 
> {{ExprCodeGenerator}} when they need to generating codes from RexNode, and 
> they can also generating codes by themselves. The {{CodeGeneratorContext}} 
> can be passed around to collect all reusable statements, and list them in the 
> final generated class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >