[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326834#comment-16326834
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , thanks for your reviews! 
I have submitted all the patches you provided offline to address above 
issues.

1. Remove `FLINK-8425` from this PR.
2. Do you think I should add more tests for `nextBufferIsEvent`? Because I 
already verified that in previous related tests
3. For adding the switch issue, I found some difficulties to leave messages 
for you offline. We can further confirm that.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , thanks for your reviews! 
I have submitted all the patches you provided offline to address above 
issues.

1. Remove `FLINK-8425` from this PR.
2. Do you think I should add more tests for `nextBufferIsEvent`? Because I 
already verified that in previous related tests
3. For adding the switch issue, I found some difficulties to leave messages 
for you offline. We can further confirm that.


---


[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326818#comment-16326818
 ] 

ASF GitHub Bot commented on FLINK-8401:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5274
  
Thanks a lot, @tillrohrmann . Added javadoc. Please take another look.


> Allow subclass to override write-failure behavior in CassandraOutputFormat 
> ---
>
> Key: FLINK-8401
> URL: https://issues.apache.org/jira/browse/FLINK-8401
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> Currently it will throw an exception and fail the entire job, we would like 
> to keep the current default behavior, but refactor the code to allow subclass 
> to override and customize the failure handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326815#comment-16326815
 ] 

ASF GitHub Bot commented on FLINK-8401:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5274#discussion_r161672687
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -66,6 +66,13 @@ public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}
 
+   protected void onWriteSuccess(ResultSet ignored) {
--- End diff --

JavaDoc added.


> Allow subclass to override write-failure behavior in CassandraOutputFormat 
> ---
>
> Key: FLINK-8401
> URL: https://issues.apache.org/jira/browse/FLINK-8401
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> Currently it will throw an exception and fail the entire job, we would like 
> to keep the current default behavior, but refactor the code to allow subclass 
> to override and customize the failure handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5274: [FLINK-8401][Cassandra Connector]Refactor CassandraOutput...

2018-01-15 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5274
  
Thanks a lot, @tillrohrmann . Added javadoc. Please take another look.


---


[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326817#comment-16326817
 ] 

ASF GitHub Bot commented on FLINK-8401:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5274#discussion_r161672700
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -66,6 +66,13 @@ public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}
 
+   protected void onWriteSuccess(ResultSet ignored) {
+   }
+
+   protected void onWriteFailure(Throwable t) {
--- End diff --

JavaDoc added.


> Allow subclass to override write-failure behavior in CassandraOutputFormat 
> ---
>
> Key: FLINK-8401
> URL: https://issues.apache.org/jira/browse/FLINK-8401
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> Currently it will throw an exception and fail the entire job, we would like 
> to keep the current default behavior, but refactor the code to allow subclass 
> to override and customize the failure handling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5274: [FLINK-8401][Cassandra Connector]Refactor Cassandr...

2018-01-15 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5274#discussion_r161672700
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -66,6 +66,13 @@ public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}
 
+   protected void onWriteSuccess(ResultSet ignored) {
+   }
+
+   protected void onWriteFailure(Throwable t) {
--- End diff --

JavaDoc added.


---


[GitHub] flink pull request #5274: [FLINK-8401][Cassandra Connector]Refactor Cassandr...

2018-01-15 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5274#discussion_r161672687
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -66,6 +66,13 @@ public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}
 
+   protected void onWriteSuccess(ResultSet ignored) {
--- End diff --

JavaDoc added.


---


[jira] [Commented] (FLINK-7777) Bump japicmp to 0.11.0

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326804#comment-16326804
 ] 

ASF GitHub Bot commented on FLINK-:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5302

[FLINK-][build] Bump japicmp to 0.11.0

## What is the purpose of the change

Currently, flink used japicmp-maven-plugin version is 0.7.0, I observed 
some warning messages
, detail: https://issues.apache.org/jira/browse/FLINK-

japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
dependency in order to prevent warnings from SAXParserImpl. _

The current stable version is 0.11.0, and  it can be built under JDK 9, we 
should consider upgrading to this version.


## Brief change log

- *Bump japicmp-maven-plugin version from 0.7.0 to 0.11.0.*
- *Remove unnecessary profile `skip-japicmp` in root POM xml*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5302


commit 57e513f0d1c002ee7c1cf3060a756a157bdb3bb6
Author: yew1eb 
Date:   2017-11-19T03:11:50Z

[FLINK-][build] Bump japicmp to 0.11.0




> Bump japicmp to 0.11.0
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting 
> these warnings from the maven plugin during a *mvn clean verify*:
> {code:java}
> [INFO] Written file '.../target/japicmp/japicmp.diff'.
> [INFO] Written file '.../target/japicmp/japicmp.xml'.
> [INFO] Written file '.../target/japicmp/japicmp.html'.
> Warning:  org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> Compiler warnings:
>   WARNING:  'org.apache.xerces.jaxp.SAXParserImpl: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.'
> Warning:  org.apache.xerces.parsers.SAXParser: Feature 
> 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> {code}
> japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
> dependency in order to prevent warnings from SAXParserImpl. _
> The current stable version is 0.11.0, we can consider upgrading to this 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5302: [FLINK-7777][build] Bump japicmp to 0.11.0

2018-01-15 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5302

[FLINK-][build] Bump japicmp to 0.11.0

## What is the purpose of the change

Currently, flink used japicmp-maven-plugin version is 0.7.0, I observed 
some warning messages
, detail: https://issues.apache.org/jira/browse/FLINK-

japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
dependency in order to prevent warnings from SAXParserImpl. _

The current stable version is 0.11.0, and  it can be built under JDK 9, we 
should consider upgrading to this version.


## Brief change log

- *Bump japicmp-maven-plugin version from 0.7.0 to 0.11.0.*
- *Remove unnecessary profile `skip-japicmp` in root POM xml*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5302


commit 57e513f0d1c002ee7c1cf3060a756a157bdb3bb6
Author: yew1eb 
Date:   2017-11-19T03:11:50Z

[FLINK-][build] Bump japicmp to 0.11.0




---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326793#comment-16326793
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667346
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

yes, the same reason as above


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326792#comment-16326792
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667234
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

yes, i will set 9 for it.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667346
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

yes, the same reason as above


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161667234
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

yes, i will set 9 for it.


---


[jira] [Closed] (FLINK-4504) Support user to decide whether the result of an operator is presistent

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-4504.
---
Resolution: Won't Fix

> Support user to decide whether the result of an operator is presistent
> --
>
> Key: FLINK-4504
> URL: https://issues.apache.org/jira/browse/FLINK-4504
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Support an api to user for deciding whether they need the result of an 
> operator to be pipeline, spilled to local file or persisted to distribute 
> file system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-.
---
Resolution: Won't Fix

> Add a DFSInputChannel and DFSSubPartition
> -
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Batch Connectors and Input/Output Formats
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> Add a new ResultPartitionType and ResultPartitionLocation type for DFS
> Add DFSSubpartition and DFSInputChannel for writing and reading DFS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2018-01-15 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu closed FLINK-8289.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2018-01-15 Thread shuai-xu
Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/5190


---


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326756#comment-16326756
 ] 

ASF GitHub Bot commented on FLINK-8289:
---

Github user shuai-xu closed the pull request at:

https://github.com/apache/flink/pull/5190


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8322) support getting number of existing timers in TimerService

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-8322.
---
Resolution: Won't Fix

> support getting number of existing timers in TimerService
> -
>
> Key: FLINK-8322
> URL: https://issues.apache.org/jira/browse/FLINK-8322
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> There are pretty common use cases where users want to use timers as scheduled 
> threads - e.g. add a timer to wake up x hours later and do something (reap 
> old data usually) only if there's no existing timers, basically we only want 
> at most 1 timer exists for the key all the time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326639#comment-16326639
 ] 

ASF GitHub Bot commented on FLINK-8267:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5301

[FLINK-8267] [Kinesis Connector] update Kinesis Producer example for 
setting Region key

## What is the purpose of the change

Update doc to guide users to use KPL's native keys to set regions. 

This originates from a bug that we forgot to set region keys explicitly for 
kinesis connector, which has been fixed. According to the previous discussion 
between @tzulitai and I, we want to remove AWSConfigConstants in 2.0 because it 
basically copies/translates config keys of KPL/KCL, which doesn't add much 
value. 

Guide users to use KPL's native keys to set regions can be the first step 
that facilitates the migration.

## Brief change log

- updated doc

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)


cc @tzulitai 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8267

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5301.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5301


commit b4034f67e727fef68740221e3b31cd131c905df1
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2
Author: Bowen Li 
Date:   2018-01-04T01:35:11Z

remove sh

commit e322a5416b0f4f89c366b98bb3571fbf6b7d460a
Author: Bowen Li 
Date:   2017-12-17T06:18:55Z

update doc

commit 1b447633df4a8bfe7c4c19e7ae91aab6157756d7
Author: Bowen Li 
Date:   2018-01-15T23:48:38Z

format code snippet

commit 4ede4b5a89d4bfcda8dcc845ab1da42177d22358
Author: Bowen Li 
Date:   2018-01-15T23:49:37Z

remove ';' from scala code




> update Kinesis Producer example for setting Region key
> --
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Pr...

2018-01-15 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5301

[FLINK-8267] [Kinesis Connector] update Kinesis Producer example for 
setting Region key

## What is the purpose of the change

Update doc to guide users to use KPL's native keys to set regions. 

This originates from a bug that we forgot to set region keys explicitly for 
kinesis connector, which has been fixed. According to the previous discussion 
between @tzulitai and I, we want to remove AWSConfigConstants in 2.0 because it 
basically copies/translates config keys of KPL/KCL, which doesn't add much 
value. 

Guide users to use KPL's native keys to set regions can be the first step 
that facilitates the migration.

## Brief change log

- updated doc

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

## Documentation

  - Does this pull request introduce a new feature? (no)


cc @tzulitai 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8267

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5301.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5301


commit b4034f67e727fef68740221e3b31cd131c905df1
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2
Author: Bowen Li 
Date:   2018-01-04T01:35:11Z

remove sh

commit e322a5416b0f4f89c366b98bb3571fbf6b7d460a
Author: Bowen Li 
Date:   2017-12-17T06:18:55Z

update doc

commit 1b447633df4a8bfe7c4c19e7ae91aab6157756d7
Author: Bowen Li 
Date:   2018-01-15T23:48:38Z

format code snippet

commit 4ede4b5a89d4bfcda8dcc845ab1da42177d22358
Author: Bowen Li 
Date:   2018-01-15T23:49:37Z

remove ';' from scala code




---


[jira] [Updated] (FLINK-8267) update Kinesis Producer example for setting Region key

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8267:

Summary: update Kinesis Producer example for setting Region key  (was: 
Kinesis Producer example setting Region key)

> update Kinesis Producer example for setting Region key
> --
>
> Key: FLINK-8267
> URL: https://issues.apache.org/jira/browse/FLINK-8267
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Dyana Rose
>Assignee: Bowen Li
>Priority: Minor
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer
> In the example code for the kinesis producer the region key is set like:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> {code}
> However, the AWS Kinesis Producer Library requires that the region key be 
> Region 
> (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269)
>  so the setting at this point should be:
> {code:java}
> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
> producerConfig.put("Region", "us-east-1");
> {code}
> When you run the Kinesis Producer you can see the effect of not setting the 
> Region key by a log line
> {noformat}
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer  - Started 
> Kinesis producer instance for region ''
> {noformat}
> The KPL also then assumes it's running on EC2 and attempts to determine it's 
> own region, which fails.
> {noformat}
> (EC2MetadataClient)Http request to Ec2MetadataService failed.
> [error] [main.cc:266] Could not configure the region. It was not given in the 
> config and we were unable to retrieve it from EC2 metadata
> {noformat}
> At the least I'd say the documentation should mention the difference between 
> these two keys and when you are required to also set the Region key.
> On the other hand, is this even the intended behaviour of this connector? Was 
> it intended that the AWSConfigConstants.AWS_REGION key also set the region of 
> the of the kinesis stream? The documentation for the example states 
> {noformat}
> The example demonstrates producing a single Kinesis stream in the AWS region 
> “us-east-1”.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:

Summary: HeapListState#add(null) will wipe out entire list state  (was: 
inconsistent behavior between HeapListState#add() and RocksDBListState#add())

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326633#comment-16326633
 ] 

ASF GitHub Bot commented on FLINK-8411:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5300

[FLINK-8411] [State Backends] inconsistent behavior between 
HeapListState#add() and RocksDBListState#add()

## What is the purpose of the change

`HeapListState#add(null)` will result in the whole state being cleared or 
wiped out. There's never a unit test for `List#add(null)` in 
`StateBackendTestBase`

## Brief change log

- changed ListState impls such that `add(null)` will be explicitly ignored
- added unit tests to test `add(null)`
- updated javaDoc

## Verifying this change

This change is already covered by existing tests, such as 
`StateBackendTestBase`.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


Note! **This work depends on FLINK-7983 and its PR at 
https://github.com/apache/flink/pull/5281**

cc @StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8411

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5300


commit b4034f67e727fef68740221e3b31cd131c905df1
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2
Author: Bowen Li 
Date:   2018-01-04T01:35:11Z

remove sh

commit 72edff2a82df625203289e4b5be23db36b03abe3
Author: Bowen Li 
Date:   2018-01-12T08:14:55Z

[FLINK-7938] introduce addAll() to ListState

commit 138d6f63dff7a840b716bf8900f97940f7d61dd8
Author: Bowen Li 
Date:   2018-01-12T18:56:42Z

add unit tests

commit 481a5a98b7b655658855e18023c7c28328b0b47d
Author: Bowen Li 
Date:   2018-01-12T19:03:07Z

add documentation for addAll()

commit cced3bac65660b27e17d258b3fd3880e9571bcf6
Author: Bowen Li 
Date:   2018-01-13T23:06:11Z

add more cases in unit test

commit 18fb3ff2965653dc0bd7c7a2d7a419ce8c7c6e8a
Author: Bowen Li 
Date:   2018-01-15T23:24:19Z

[FLINK-8411] inconsistent behavior between HeapListState#add() and 
RocksDBListState#add()




> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> 
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}

[GitHub] flink pull request #5300: [FLINK-8411] [State Backends] inconsistent behavio...

2018-01-15 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5300

[FLINK-8411] [State Backends] inconsistent behavior between 
HeapListState#add() and RocksDBListState#add()

## What is the purpose of the change

`HeapListState#add(null)` will result in the whole state being cleared or 
wiped out. There's never a unit test for `List#add(null)` in 
`StateBackendTestBase`

## Brief change log

- changed ListState impls such that `add(null)` will be explicitly ignored
- added unit tests to test `add(null)`
- updated javaDoc

## Verifying this change

This change is already covered by existing tests, such as 
`StateBackendTestBase`.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


Note! **This work depends on FLINK-7983 and its PR at 
https://github.com/apache/flink/pull/5281**

cc @StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8411

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5300


commit b4034f67e727fef68740221e3b31cd131c905df1
Author: Bowen Li 
Date:   2018-01-02T19:21:28Z

update local branch

commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2
Author: Bowen Li 
Date:   2018-01-04T01:35:11Z

remove sh

commit 72edff2a82df625203289e4b5be23db36b03abe3
Author: Bowen Li 
Date:   2018-01-12T08:14:55Z

[FLINK-7938] introduce addAll() to ListState

commit 138d6f63dff7a840b716bf8900f97940f7d61dd8
Author: Bowen Li 
Date:   2018-01-12T18:56:42Z

add unit tests

commit 481a5a98b7b655658855e18023c7c28328b0b47d
Author: Bowen Li 
Date:   2018-01-12T19:03:07Z

add documentation for addAll()

commit cced3bac65660b27e17d258b3fd3880e9571bcf6
Author: Bowen Li 
Date:   2018-01-13T23:06:11Z

add more cases in unit test

commit 18fb3ff2965653dc0bd7c7a2d7a419ce8c7c6e8a
Author: Bowen Li 
Date:   2018-01-15T23:24:19Z

[FLINK-8411] inconsistent behavior between HeapListState#add() and 
RocksDBListState#add()




---


[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8411:

Component/s: State Backends, Checkpointing

> inconsistent behavior between HeapListState#add() and RocksDBListState#add()
> 
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8188) Clean up flink-contrib

2018-01-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-8188:
---

Assignee: Bowen Li

> Clean up flink-contrib
> --
>
> Key: FLINK-8188
> URL: https://issues.apache.org/jira/browse/FLINK-8188
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>
> This is the umbrella ticket for cleaning up flink-contrib. 
> We argue that flink-contrib should be removed and all its submodules should 
> be migrated to other top-level modules for the following reasons: 
> 1) Apache Flink the whole project itself is a result of contributions from 
> many developers, there's no reason to highlight some contributions in a 
> dedicated module named 'contrib'
> 2) flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> 3) This will save us quite some build time
> More details in discussions at FLINK-8175 and FLINK-8167



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5299: [Hotfix][Doc][DataStream API] Fix Scala code examp...

2018-01-15 Thread elbaulp
GitHub user elbaulp opened a pull request:

https://github.com/apache/flink/pull/5299

[Hotfix][Doc][DataStream API] Fix Scala code example in Controlling Latency 
section



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/elbaulp/flink hotfix-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5299.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5299


commit 6bb3ced0828c9a3fdf5fc99d17dc8a573eb4163d
Author: Alejandro Alcalde 
Date:   2018-01-15T18:13:27Z

Merge remote-tracking branch 'upstream/master'

commit 7497d756f01aa9b596e11d64a6fa2a24702a53ce
Author: Alejandro Alcalde 
Date:   2018-01-15T18:27:55Z

[Hotfix][doc][DataStream API] Fix Scala code example in Controlling Latency 
section




---


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326461#comment-16326461
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5281
  
Hi guys, can @StefanRRichter  @aljoscha or any committer else take a look 
at this PR?

I'll host the Seattle Apache Flink Meetup this Wednesday and give a talk. I 
want to talk about the new APIs `ListState#update()` and  `ListState#addAll()`. 
It will be great to get this merged in before then. Thanks!


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5281: [FLINK-7938] support addAll() in ListState

2018-01-15 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5281
  
Hi guys, can @StefanRRichter  @aljoscha or any committer else take a look 
at this PR?

I'll host the Seattle Apache Flink Meetup this Wednesday and give a talk. I 
want to talk about the new APIs `ListState#update()` and  `ListState#addAll()`. 
It will be great to get this merged in before then. Thanks!


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326451#comment-16326451
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4552
  
one thing which we talked about offline: as a precaution, we should keep 
the old implementation around and allow the users to basically turn the 
credit-based flow control algorithm on/off (the accounting for the credits 
would mostly stay in that case but will simple not be used by the old 
non-existing flow control)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-15 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4552
  
one thing which we talked about offline: as a precaution, we should keep 
the old implementation around and allow the users to basically turn the 
credit-based flow control algorithm on/off (the accounting for the credits 
would mostly stay in that case but will simple not be used by the old 
non-existing flow control)


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326442#comment-16326442
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559926
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326430#comment-16326430
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161485403
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -434,6 +443,29 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) t

UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
 
+   /**
+* Creates and returns a remote input channel for the specific input 
gate with specific partition request client.
+*
+* @param inputGate The input gate owns the created input channel.
+* @param client The client is used to send partition request.
+* @return The new created remote input channel.
+*/
+   private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate, PartitionRequestClient client) throws Exception {
--- End diff --

could you modify 
`PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` 
to rely on this method, i.e. `return createRemoteInputChannel(inputGate, 
mock(PartitionRequestClient.class));`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326439#comment-16326439
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available 

[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326435#comment-16326435
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161568012
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

we should test this everywhere we access `getNextBuffer()` or add buffers 
via `add()` - also if `getNextBuffer()` is `null` or before even requesting 
anything


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326448#comment-16326448
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161578518
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Is that also the reason here? I see that otherwise we get into 
`Insufficient number of network buffers` but it does not look as if it was 
configured as tightly...
(just want to rule out some memory leak with the new code)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326443#comment-16326443
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161568012
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

we should test this everywhere we access `getNextBuffer()` or add buffers 
via `add()` - also if `getNextBuffer()` is `null` or before even requesting 
anything


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161576253
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

just a note for the curious:  this test can cope with higher number of 
network buffers and is waiting for all of them to be blocked - increasing this 
to `9` would have been enough here though (we require 2 exclusive buffers now 
per default, while 1 was the minimum per incoming channel)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
--- End diff --

actually, let's use `assertThat(queue.getAvailableReaders(), 
contains(reader));` here which gives much nicer output in case something is 
wrong


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161570121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()` then?


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326433#comment-16326433
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326436#comment-16326436
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161547041
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
--- End diff --

nit: `is an event`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326440#comment-16326440
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559913
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
--- End diff --

let's remove that mock ...


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326437#comment-16326437
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
--- End diff --

actually, let's use `assertThat(queue.getAvailableReaders(), 
contains(reader));` here which gives much nicer output in case something is 
wrong


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
--- End diff --

let's end with `assertNull(channel.readOutbound());`


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326444#comment-16326444
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161569135
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 
+   assertFalse(view.nextBufferIsEvent());
+   read = view.getNextBuffer();
+   assertNotNull(read);
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
+   assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+
// Add event to the queue...
Buffer event = createBuffer();
event.tagAsEvent();
subpartition.add(event);
 
+   assertTrue(view.nextBufferIsEvent());
assertEquals(3, subpartition.getTotalNumberOfBuffers());
-   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
}
--- End diff --

maybe verify that `nextBufferIsEvent()` returns the right thing after 
adding a real buffer now (with the event being next)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326445#comment-16326445
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161573976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
+   assertFalse(reader.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

similar here: verify `PartitionRequest` after 
`inputChannel.requestSubpartition`


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326431#comment-16326431
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161545040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws 
Exception {
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse1);
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse2);
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel1);
-   handler.notifyCreditAvailable(inputChannel2);
-
assertEquals(2, inputChannel1.getUnannouncedCredit());
assertEquals(2, inputChannel2.getUnannouncedCredit());
 
channel.runPendingTasks();
 
-   // The two input channels should notify credits via 
writable channel
+   // The two input channels should send partition 
requests and then notify credits via writable channel
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel1.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel2.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

Let's verify those two `PartitionRequest` messages above since 
`inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if 
we change the `initialCredit` to be included in the `unannouncedCredit`).


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559926
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326447#comment-16326447
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161576253
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 ---
@@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws 
Exception {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
--- End diff --

just a note for the curious:  this test can cope with higher number of 
network buffers and is waiting for all of them to be blocked - increasing this 
to `9` would have been enough here though (we require 2 exclusive buffers now 
per default, while 1 was the minimum per incoming channel)


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326446#comment-16326446
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161570121
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
// ...and one available result
+   assertFalse(view.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()` then?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326432#comment-16326432
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(CloseRequest.class));
--- End diff --

put these after `inputGate.releaseAllResources()`?


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326438#comment-16326438
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available 

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161578518
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 ---
@@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {

config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
80L);

config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+   
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
--- End diff --

Is that also the reason here? I see that otherwise we get into 
`Insufficient number of network buffers` but it does not look as if it was 
configured as tightly...
(just want to rule out some memory leak with the new code)


---


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326434#comment-16326434
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

similar here: verify `PartitionRequest` after 
`inputChannel.requestSubpartition`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326441#comment-16326441
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567305
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
--- End diff --

let's end with `assertNull(channel.readOutbound());`


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161573976
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
+   assertFalse(reader.nextBufferIsEvent());
--- End diff --

also test `read.buffer().isBuffer()`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161569135
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 ---
@@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() 
throws Exception {
assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 
+   assertFalse(view.nextBufferIsEvent());
+   read = view.getNextBuffer();
+   assertNotNull(read);
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(subpartition.getBuffersInBacklog(), 
read.buffersInBacklog());
+   assertNull(view.getNextBuffer());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+
// Add event to the queue...
Buffer event = createBuffer();
event.tagAsEvent();
subpartition.add(event);
 
+   assertTrue(view.nextBufferIsEvent());
assertEquals(3, subpartition.getTotalNumberOfBuffers());
-   assertEquals(1, subpartition.getBuffersInBacklog());
+   assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
}
--- End diff --

maybe verify that `nextBufferIsEvent()` returns the right thing after 
adding a real buffer now (with the event being next)


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161567331
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available buffers to trigger enqueue the reader
+   final int notifyNumBuffers = 5;
+   for (int i = 0; i < notifyNumBuffers; i++) {
+   reader.notifyBuffersAvailable(1);
+   }
+
+   

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161547041
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
--- End diff --

nit: `is an event`


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161559913
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = spy(new 
PartitionRequestQueue());
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
+   // The reader is enqueued in the pipeline because the next 
buffer is event, even though no available credits
+   verify(queue, times(1)).enqueueAvailableReader(reader);
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
--- End diff --

let's remove that mock ...


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161565896
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws 
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) 
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline if the next 
sending buffer is event, even
+* though it has no available credits.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(true);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify an available event buffer to trigger enqueue the 
reader
+   reader.notifyBuffersAvailable(1);
+
+   channel.runPendingTasks();
+
+   // The reader is enqueued in the pipeline because the next 
buffer is an event, even though no credits are available
+   assertEquals(1, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+
+   // Flush the buffer to make the channel writable again and see 
the final results
+   channel.flush();
+   assertSame(channelBlockingBuffer, channel.readOutbound());
+
+   assertEquals(0, queue.getAvailableReaders().size());
+   assertEquals(0, reader.getNumCreditsAvailable());
+   }
+
+   /**
+* Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+* verifying the reader would be enqueued in the pipeline iff it has 
both available credits and buffers.
+*/
+   @Test
+   public void testEnqueueReaderByNotifyingBufferAndCredit() throws 
Exception {
+   // setup
+   final ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.nextBufferIsEvent()).thenReturn(false);
+   when(view.getNextBuffer()).thenReturn(new 
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final ResultPartitionProvider partitionProvider = 
mock(ResultPartitionProvider.class);
+   when(partitionProvider.createSubpartitionView(
+   eq(partitionId),
+   eq(0),
+   
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+   final InputChannelID receiverId = new InputChannelID();
+   final PartitionRequestQueue queue = new PartitionRequestQueue();
+   final SequenceNumberingViewReader reader = new 
SequenceNumberingViewReader(receiverId, 0, queue);
+   final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+   reader.requestSubpartitionView(partitionProvider, partitionId, 
0);
+   queue.notifyReaderCreated(reader);
+
+   // block the channel so that we see an intermediate state in 
the test
+   ByteBuf channelBlockingBuffer = blockChannel(channel);
+   assertNull(channel.readOutbound());
+
+   // Notify available buffers to trigger enqueue the reader
+   final int notifyNumBuffers = 5;
+   for (int i = 0; i < notifyNumBuffers; i++) {
+   reader.notifyBuffersAvailable(1);
+   }
+
+   

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161546199
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() 
throws Exception {
 
assertEquals(2, inputChannel.getUnannouncedCredit());
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel);
-
// Release the input channel
inputGate.releaseAllResources();
 
channel.runPendingTasks();
 
-   // It will not notify credits for released input channel
+   // It should send partition request first, and send 
close request after releasing input channel,
+   // but will not notify credits for released input 
channel.
+   Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(CloseRequest.class));
--- End diff --

put these after `inputGate.releaseAllResources()`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161485403
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -434,6 +443,29 @@ private RemoteInputChannel 
createRemoteInputChannel(SingleInputGate inputGate) t

UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
 
+   /**
+* Creates and returns a remote input channel for the specific input 
gate with specific partition request client.
+*
+* @param inputGate The input gate owns the created input channel.
+* @param client The client is used to send partition request.
+* @return The new created remote input channel.
+*/
+   private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate, PartitionRequestClient client) throws Exception {
--- End diff --

could you modify 
`PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` 
to rely on this method, i.e. `return createRemoteInputChannel(inputGate, 
mock(PartitionRequestClient.class));`?


---


[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

2018-01-15 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4552#discussion_r161545040
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws 
Exception {
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse1);
handler.channelRead(mock(ChannelHandlerContext.class), 
bufferResponse2);
 
-   // The PartitionRequestClient is tied to 
PartitionRequestClientHandler currently, so we
-   // have to notify credit available in 
CreditBasedClientHandler explicitly
-   handler.notifyCreditAvailable(inputChannel1);
-   handler.notifyCreditAvailable(inputChannel2);
-
assertEquals(2, inputChannel1.getUnannouncedCredit());
assertEquals(2, inputChannel2.getUnannouncedCredit());
 
channel.runPendingTasks();
 
-   // The two input channels should notify credits via 
writable channel
+   // The two input channels should send partition 
requests and then notify credits via writable channel
assertTrue(channel.isWritable());
Object readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel1.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
+
+   readFromOutbound = channel.readOutbound();
+   assertThat(readFromOutbound, 
instanceOf(PartitionRequest.class));
+   assertEquals(inputChannel2.getInputChannelId(), 
((PartitionRequest) readFromOutbound).receiverId);
+   assertEquals(2, ((PartitionRequest) 
readFromOutbound).credit);
--- End diff --

Let's verify those two `PartitionRequest` messages above since 
`inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if 
we change the `initialCredit` to be included in the `unannouncedCredit`).


---


[jira] [Assigned] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-15 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-8407:
--

Assignee: Xingcan Cui

> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8316) The CsvTableSink and the CsvInputFormat are not in sync

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326371#comment-16326371
 ] 

ASF GitHub Bot commented on FLINK-8316:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5210
  
Hi @sunjincheng121, thanks for your reply. I think an example would be 
that, for some non-standard CSV files like `a, b , c,`, if the boolean flag 
`trailingDelimiter=false`, the file will be parsed with 4 fields; while if 
`trailingDelimiter=true`, the file will be parsed with 3 fields, in which the 
trailing delimiter `,` is omitted. Further, the trailing delimiter could be set 
as another character, e.g., `a, b, c;`.


> The CsvTableSink and the CsvInputFormat are not in sync
> ---
>
> Key: FLINK-8316
> URL: https://issues.apache.org/jira/browse/FLINK-8316
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> As illustrated in [this 
> thread|https://lists.apache.org/thread.html/cfe3b1718a479300dc91d1523be023ef5bc702bd5ad53af4fea5a596@%3Cuser.flink.apache.org%3E],
>  the format for data generated in {{CsvTableSink}} is not compatible with 
> that accepted by {{CsvInputFormat}}. We should unify their trailing 
> delimiters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5210: [FLINK-8316] [table] The CsvTableSink and the CsvInputFor...

2018-01-15 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5210
  
Hi @sunjincheng121, thanks for your reply. I think an example would be 
that, for some non-standard CSV files like `a, b , c,`, if the boolean flag 
`trailingDelimiter=false`, the file will be parsed with 4 fields; while if 
`trailingDelimiter=true`, the file will be parsed with 3 fields, in which the 
trailing delimiter `,` is omitted. Further, the trailing delimiter could be set 
as another character, e.g., `a, b, c;`.


---


[GitHub] flink pull request #:

2018-01-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/5623ac66bd145d52f3488ac2fff9dbc762d0bda1#commitcomment-26867793
  
@zentol @rmetzger I think this is not correct. The RocksDB state backend is 
in `lib` by default. This is only relevant for "running in the IDE". The text 
suggests you need to add this to your user jar.


---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326332#comment-16326332
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
Here's a preliminary changelog:

General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version 
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile

API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as 
they are redundant due to environment factory methods

Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to 
SerialVersionOverridingPythonObjectInputStream

Functions:
- Introduced AbstractPythonUDF class for sharing 
RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during 
initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing

- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns 
something that isn't a PyObject

PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the 
PythonInterpreter

Environment:
- Reworked static environment factory methods from 
PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using 
static fields
- removed PythonEnvironmentConfig

Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the 
condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with 
PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file 
location instead


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2018-01-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
Here's a preliminary changelog:

General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version 
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile

API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as 
they are redundant due to environment factory methods

Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to 
SerialVersionOverridingPythonObjectInputStream

Functions:
- Introduced AbstractPythonUDF class for sharing 
RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during 
initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing

- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns 
something that isn't a PyObject

PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the 
PythonInterpreter

Environment:
- Reworked static environment factory methods from 
PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using 
static fields
- removed PythonEnvironmentConfig

Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the 
condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with 
PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file 
location instead


---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326327#comment-16326327
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I've been digging into this for the past week. I found a number of things 
to improve and did so in a local branch. Once I've finalized/tests things 
(probably tomorrow) I'll link the branch here or open another PR.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2018-01-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I've been digging into this for the past week. I found a number of things 
to improve and did so in a local branch. Once I've finalized/tests things 
(probably tomorrow) I'll link the branch here or open another PR.


---


[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2018-01-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326323#comment-16326323
 ] 

Till Rohrmann commented on FLINK-7949:
--

Fixed in 1.4.1 via

7d040fd40c2816e829c81cb38177b6e1579c761c

9014167987cfcd108e7316281d562b7d85c12fba

> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>Assignee: Bartłomiej Tartanus
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full

2018-01-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7949:
-
Fix Version/s: 1.4.1

> AsyncWaitOperator is not restarting when queue is full
> --
>
> Key: FLINK-7949
> URL: https://issues.apache.org/jira/browse/FLINK-7949
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Bartłomiej Tartanus
>Assignee: Bartłomiej Tartanus
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>   Original Estimate: 0.25h
>  Remaining Estimate: 0.25h
>
> Issue was describe here:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html
> Issue - AsyncWaitOperator can't restart properly after failure (thread is 
> waiting forever)
> Scenario to reproduce this issue:
> 1. The queue is full (let's assume that its capacity is N elements) 
> 2. There is some pending element waiting, so the 
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and 
> while-loop in addAsyncBufferEntry method is trying to add this element to 
> the queue (but element is not added because queue is full) 
> 3. Now the snapshot is taken - the whole queue of N elements is being 
> written into the ListState in snapshotState method and also (what is more 
> important) this pendingStreamElementQueueEntry is written to this list too. 
> 4. The process is being restarted, so it tries to recover all the elements 
> and put them again into the queue, but the list of recovered elements hold 
> N+1 element and our queue capacity is only N. Process is not started yet, so 
> it can not process any element and this one element is waiting endlessly. 
> But it's never added and the process will never process anything. Deadlock. 
> 5. Trigger is fired and indeed discarded because the process is not running 
> yet. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326322#comment-16326322
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4767
  
Hi @EronWright, yes I think we still need support for web sockets. The 
first REST based client won't use this but later on we should definitely add 
this functionality. At the moment we try hard to make Flip-6 feature equivalent 
to the old distributed architecture and therefore we couldn't make progress 
here. But once this has been done, we should re-iterate over this PR again.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

2018-01-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4767
  
Hi @EronWright, yes I think we still need support for web sockets. The 
first REST based client won't use this but later on we should definitely add 
this functionality. At the moment we try hard to make Flip-6 feature equivalent 
to the old distributed architecture and therefore we couldn't make progress 
here. But once this has been done, we should re-iterate over this PR again.


---


[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326318#comment-16326318
 ] 

ASF GitHub Bot commented on FLINK-8082:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5262


> Bump version compatibility check to 1.4
> ---
>
> Key: FLINK-8082
> URL: https://issues.apache.org/jira/browse/FLINK-8082
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> Similar to FLINK-7977, we must bump the version of the compatibility check to 
> compare 1.5 against 1.4, once it is released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5289: [hotfix] [docs] Fix typos

2018-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5289


---


[GitHub] flink pull request #5262: [FLINK-8082][build] Bump flink version for japicmp...

2018-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5262


---


[GitHub] flink pull request #5293: [hotfix][docs] Mention maven dependency for RocksD...

2018-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5293


---


[GitHub] flink pull request #5133: [hotfix] Fix typo in AkkaUtils method

2018-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5133


---


[jira] [Closed] (FLINK-8082) Bump version compatibility check to 1.4

2018-01-15 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8082.
---
Resolution: Fixed

master: 4e0ca93d3f496b410452ab31485ed920e9ab2702

> Bump version compatibility check to 1.4
> ---
>
> Key: FLINK-8082
> URL: https://issues.apache.org/jira/browse/FLINK-8082
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> Similar to FLINK-7977, we must bump the version of the compatibility check to 
> compare 1.5 against 1.4, once it is released.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326315#comment-16326315
 ] 

ASF GitHub Bot commented on FLINK-8399:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5271


> Use independent configurations for the different timeouts in slot manager
> -
>
> Key: FLINK-8399
> URL: https://issues.apache.org/jira/browse/FLINK-8399
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> There are three parameter in slot manager to indicate the timeout for slot 
> request to task manager, slot request to be discarded and task manager to be 
> released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, 
> need to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...

2018-01-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5271


---


[jira] [Resolved] (FLINK-8399) Use independent configurations for the different timeouts in slot manager

2018-01-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8399.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via b4599156415f2ad1eee58ffce9a5e9fa54bbdd4e

> Use independent configurations for the different timeouts in slot manager
> -
>
> Key: FLINK-8399
> URL: https://issues.apache.org/jira/browse/FLINK-8399
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> There are three parameter in slot manager to indicate the timeout for slot 
> request to task manager, slot request to be discarded and task manager to be 
> released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, 
> need to use independent configurations for them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once

2018-01-15 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8435:

Environment: (was: Many times doing something approximately is enough 
for users (Such as counting). I think we should implement bloom filter and 
HyperLogLog as state for keyed state. By this users are able to filter or 
process many things approximately.)

> Adding BloomFilter/HyperLogLog state as Managed once
> 
>
> Key: FLINK-8435
> URL: https://issues.apache.org/jira/browse/FLINK-8435
> Project: Flink
>  Issue Type: New Feature
>Reporter: Moein Hosseini
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once

2018-01-15 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8435:

Description: Many times doing something approximately is enough for users 
(Such as counting). I think we should implement bloom filter and HyperLogLog as 
state for keyed state. By this users are able to filter or process many things 
approximately.

> Adding BloomFilter/HyperLogLog state as Managed once
> 
>
> Key: FLINK-8435
> URL: https://issues.apache.org/jira/browse/FLINK-8435
> Project: Flink
>  Issue Type: New Feature
>Reporter: Moein Hosseini
>Priority: Major
>
> Many times doing something approximately is enough for users (Such as 
> counting). I think we should implement bloom filter and HyperLogLog as state 
> for keyed state. By this users are able to filter or process many things 
> approximately.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once

2018-01-15 Thread Moein Hosseini (JIRA)
Moein Hosseini created FLINK-8435:
-

 Summary: Adding BloomFilter/HyperLogLog state as Managed once
 Key: FLINK-8435
 URL: https://issues.apache.org/jira/browse/FLINK-8435
 Project: Flink
  Issue Type: New Feature
 Environment: Many times doing something approximately is enough for 
users (Such as counting). I think we should implement bloom filter and 
HyperLogLog as state for keyed state. By this users are able to filter or 
process many things approximately.
Reporter: Moein Hosseini






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8414) Gelly performance seriously decreases when using the suggested parallelism configuration

2018-01-15 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326285#comment-16326285
 ] 

Greg Hogan commented on FLINK-8414:
---

You certainly can measure scalability but as you have discovered the 
performance will not be monotonically increasing. Redistributing operators 
require a channel between each pair of tasks, so with a parallelism of 2^7 you 
will have 2^14 channels between each task for each iteration.

There are many reasons to use Flink and Gelly, but for some use cases for 
certain algorithms you may even get better performance with a single-threaded 
implementation. See "Scalability! But at what COST?". ConnectedComponents and 
PageRank require, respectively, no and very little intermediate data, whereas 
the similarity measures JaccardIndex and AdamicAdar as well as triangle metrics 
such as ClusteringCoefficient process super-linear intermediate data and 
benefit much more from Flink's scalability. When comparing against 
non-distributed implementations it is important to note that all Gelly 
algorithms process generic data, whereas many "optimized" algorithms assume 
compact integer representations.

> Gelly performance seriously decreases when using the suggested parallelism 
> configuration
> 
>
> Key: FLINK-8414
> URL: https://issues.apache.org/jira/browse/FLINK-8414
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, Documentation, Gelly
>Reporter: flora karniav
>Priority: Minor
>
> I am running Gelly examples with different datasets in a cluster of 5 
> machines (1 Jobmanager and 4 Taskmanagers) of 32 cores each.
> The number of Slots parameter is set to 32 (as suggested) and the parallelism 
> to 128 (32 cores*4 taskmanagers).
> I observe a vast performance degradation using these suggested settings than 
> setting parallelism.default to 16 for example were the same job completes at 
> ~60 seconds vs ~140 in the 128 parallelism case.
> Is there something wrong in my configuration? Should I decrease parallelism 
> and -if so- will this inevitably decrease CPU utilization?
> Another matter that may be related to this is the number of partitions of the 
> data. Is this somehow related to parallelism? How many partitions are created 
> in the case of parallelism.default=128? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326275#comment-16326275
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted all the modifications based on the patches you 
provided.
The tests for `nextBufferIsEvent` will be added in a new commit tomorrow.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-15 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted all the modifications based on the patches you 
provided.
The tests for `nextBufferIsEvent` will be added in a new commit tomorrow.


---


[jira] [Resolved] (FLINK-6730) Activate strict checkstyle for flink-optimizer

2018-01-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan resolved FLINK-6730.
---
Resolution: Won't Fix

Closing per comment from #5294: "I would ignore the optimizer module TBH. There 
are no recent contributions to the module, so we don't benefit from cleaner PR. 
I also don't know what will happen to the module when we start unifying the 
batch APIs, which probably will be either be a full rewrite or 
removal."

> Activate strict checkstyle for flink-optimizer
> --
>
> Key: FLINK-6730
> URL: https://issues.apache.org/jira/browse/FLINK-6730
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Long term issue for incrementally introducing the strict checkstyle to 
> flink-optimizer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8427) Checkstyle for org.apache.flink.optimizer.costs

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326243#comment-16326243
 ] 

ASF GitHub Bot commented on FLINK-8427:
---

Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/5294


> Checkstyle for org.apache.flink.optimizer.costs
> ---
>
> Key: FLINK-8427
> URL: https://issues.apache.org/jira/browse/FLINK-8427
> Project: Flink
>  Issue Type: Improvement
>  Components: Optimizer
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >