[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326834#comment-16326834 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , thanks for your reviews! I have submitted all the patches you provided offline to address above issues. 1. Remove `FLINK-8425` from this PR. 2. Do you think I should add more tests for `nextBufferIsEvent`? Because I already verified that in previous related tests 3. For adding the switch issue, I found some difficulties to leave messages for you offline. We can further confirm that. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , thanks for your reviews! I have submitted all the patches you provided offline to address above issues. 1. Remove `FLINK-8425` from this PR. 2. Do you think I should add more tests for `nextBufferIsEvent`? Because I already verified that in previous related tests 3. For adding the switch issue, I found some difficulties to leave messages for you offline. We can further confirm that. ---
[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326818#comment-16326818 ] ASF GitHub Bot commented on FLINK-8401: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5274 Thanks a lot, @tillrohrmann . Added javadoc. Please take another look. > Allow subclass to override write-failure behavior in CassandraOutputFormat > --- > > Key: FLINK-8401 > URL: https://issues.apache.org/jira/browse/FLINK-8401 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > Currently it will throw an exception and fail the entire job, we would like > to keep the current default behavior, but refactor the code to allow subclass > to override and customize the failure handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326815#comment-16326815 ] ASF GitHub Bot commented on FLINK-8401: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5274#discussion_r161672687 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -66,6 +66,13 @@ public void configure(Configuration parameters) { this.cluster = builder.getCluster(); } + protected void onWriteSuccess(ResultSet ignored) { --- End diff -- JavaDoc added. > Allow subclass to override write-failure behavior in CassandraOutputFormat > --- > > Key: FLINK-8401 > URL: https://issues.apache.org/jira/browse/FLINK-8401 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > Currently it will throw an exception and fail the entire job, we would like > to keep the current default behavior, but refactor the code to allow subclass > to override and customize the failure handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5274: [FLINK-8401][Cassandra Connector]Refactor CassandraOutput...
Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5274 Thanks a lot, @tillrohrmann . Added javadoc. Please take another look. ---
[jira] [Commented] (FLINK-8401) Allow subclass to override write-failure behavior in CassandraOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326817#comment-16326817 ] ASF GitHub Bot commented on FLINK-8401: --- Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5274#discussion_r161672700 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -66,6 +66,13 @@ public void configure(Configuration parameters) { this.cluster = builder.getCluster(); } + protected void onWriteSuccess(ResultSet ignored) { + } + + protected void onWriteFailure(Throwable t) { --- End diff -- JavaDoc added. > Allow subclass to override write-failure behavior in CassandraOutputFormat > --- > > Key: FLINK-8401 > URL: https://issues.apache.org/jira/browse/FLINK-8401 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > Currently it will throw an exception and fail the entire job, we would like > to keep the current default behavior, but refactor the code to allow subclass > to override and customize the failure handling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5274: [FLINK-8401][Cassandra Connector]Refactor Cassandr...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5274#discussion_r161672700 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -66,6 +66,13 @@ public void configure(Configuration parameters) { this.cluster = builder.getCluster(); } + protected void onWriteSuccess(ResultSet ignored) { + } + + protected void onWriteFailure(Throwable t) { --- End diff -- JavaDoc added. ---
[GitHub] flink pull request #5274: [FLINK-8401][Cassandra Connector]Refactor Cassandr...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5274#discussion_r161672687 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -66,6 +66,13 @@ public void configure(Configuration parameters) { this.cluster = builder.getCluster(); } + protected void onWriteSuccess(ResultSet ignored) { --- End diff -- JavaDoc added. ---
[jira] [Commented] (FLINK-7777) Bump japicmp to 0.11.0
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326804#comment-16326804 ] ASF GitHub Bot commented on FLINK-: --- GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/5302 [FLINK-][build] Bump japicmp to 0.11.0 ## What is the purpose of the change Currently, flink used japicmp-maven-plugin version is 0.7.0, I observed some warning messages , detail: https://issues.apache.org/jira/browse/FLINK- japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting dependency in order to prevent warnings from SAXParserImpl. _ The current stable version is 0.11.0, and it can be built under JDK 9, we should consider upgrading to this version. ## Brief change log - *Bump japicmp-maven-plugin version from 0.7.0 to 0.11.0.* - *Remove unnecessary profile `skip-japicmp` in root POM xml* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK- Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5302 commit 57e513f0d1c002ee7c1cf3060a756a157bdb3bb6 Author: yew1ebDate: 2017-11-19T03:11:50Z [FLINK-][build] Bump japicmp to 0.11.0 > Bump japicmp to 0.11.0 > -- > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Minor > Fix For: 1.5.0 > > > Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting > these warnings from the maven plugin during a *mvn clean verify*: > {code:java} > [INFO] Written file '.../target/japicmp/japicmp.diff'. > [INFO] Written file '.../target/japicmp/japicmp.xml'. > [INFO] Written file '.../target/japicmp/japicmp.html'. > Warning: org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property > 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not > recognized. > Compiler warnings: > WARNING: 'org.apache.xerces.jaxp.SAXParserImpl: Property > 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.' > Warning: org.apache.xerces.parsers.SAXParser: Feature > 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized. > Warning: org.apache.xerces.parsers.SAXParser: Property > 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized. > Warning: org.apache.xerces.parsers.SAXParser: Property > 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not > recognized. > {code} > japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting > dependency in order to prevent warnings from SAXParserImpl. _ > The current stable version is 0.11.0, we can consider upgrading to this > version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5302: [FLINK-7777][build] Bump japicmp to 0.11.0
GitHub user yew1eb opened a pull request: https://github.com/apache/flink/pull/5302 [FLINK-][build] Bump japicmp to 0.11.0 ## What is the purpose of the change Currently, flink used japicmp-maven-plugin version is 0.7.0, I observed some warning messages , detail: https://issues.apache.org/jira/browse/FLINK- japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting dependency in order to prevent warnings from SAXParserImpl. _ The current stable version is 0.11.0, and it can be built under JDK 9, we should consider upgrading to this version. ## Brief change log - *Bump japicmp-maven-plugin version from 0.7.0 to 0.11.0.* - *Remove unnecessary profile `skip-japicmp` in root POM xml* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/flink FLINK- Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5302 commit 57e513f0d1c002ee7c1cf3060a756a157bdb3bb6 Author: yew1ebDate: 2017-11-19T03:11:50Z [FLINK-][build] Bump japicmp to 0.11.0 ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326793#comment-16326793 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667346 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- yes, the same reason as above > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326792#comment-16326792 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667234 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- yes, i will set 9 for it. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667346 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- yes, the same reason as above ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161667234 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- yes, i will set 9 for it. ---
[jira] [Closed] (FLINK-4504) Support user to decide whether the result of an operator is presistent
[ https://issues.apache.org/jira/browse/FLINK-4504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-4504. --- Resolution: Won't Fix > Support user to decide whether the result of an operator is presistent > -- > > Key: FLINK-4504 > URL: https://issues.apache.org/jira/browse/FLINK-4504 > Project: Flink > Issue Type: Sub-task > Components: DataSet API >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Support an api to user for deciding whether they need the result of an > operator to be pipeline, spilled to local file or persisted to distribute > file system. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-4444) Add a DFSInputChannel and DFSSubPartition
[ https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-. --- Resolution: Won't Fix > Add a DFSInputChannel and DFSSubPartition > - > > Key: FLINK- > URL: https://issues.apache.org/jira/browse/FLINK- > Project: Flink > Issue Type: Sub-task > Components: Batch Connectors and Input/Output Formats >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > > Add a new ResultPartitionType and ResultPartitionLocation type for DFS > Add DFSSubpartition and DFSInputChannel for writing and reading DFS -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu closed FLINK-8289. --- Resolution: Fixed Fix Version/s: 1.5.0 > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/5190 ---
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326756#comment-16326756 ] ASF GitHub Bot commented on FLINK-8289: --- Github user shuai-xu closed the pull request at: https://github.com/apache/flink/pull/5190 > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8322) support getting number of existing timers in TimerService
[ https://issues.apache.org/jira/browse/FLINK-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-8322. --- Resolution: Won't Fix > support getting number of existing timers in TimerService > - > > Key: FLINK-8322 > URL: https://issues.apache.org/jira/browse/FLINK-8322 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > There are pretty common use cases where users want to use timers as scheduled > threads - e.g. add a timer to wake up x hours later and do something (reap > old data usually) only if there's no existing timers, basically we only want > at most 1 timer exists for the key all the time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8267) update Kinesis Producer example for setting Region key
[ https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326639#comment-16326639 ] ASF GitHub Bot commented on FLINK-8267: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5301 [FLINK-8267] [Kinesis Connector] update Kinesis Producer example for setting Region key ## What is the purpose of the change Update doc to guide users to use KPL's native keys to set regions. This originates from a bug that we forgot to set region keys explicitly for kinesis connector, which has been fixed. According to the previous discussion between @tzulitai and I, we want to remove AWSConfigConstants in 2.0 because it basically copies/translates config keys of KPL/KCL, which doesn't add much value. Guide users to use KPL's native keys to set regions can be the first step that facilitates the migration. ## Brief change log - updated doc ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) cc @tzulitai You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8267 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5301.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5301 commit b4034f67e727fef68740221e3b31cd131c905df1 Author: Bowen LiDate: 2018-01-02T19:21:28Z update local branch commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2 Author: Bowen Li Date: 2018-01-04T01:35:11Z remove sh commit e322a5416b0f4f89c366b98bb3571fbf6b7d460a Author: Bowen Li Date: 2017-12-17T06:18:55Z update doc commit 1b447633df4a8bfe7c4c19e7ae91aab6157756d7 Author: Bowen Li Date: 2018-01-15T23:48:38Z format code snippet commit 4ede4b5a89d4bfcda8dcc845ab1da42177d22358 Author: Bowen Li Date: 2018-01-15T23:49:37Z remove ';' from scala code > update Kinesis Producer example for setting Region key > -- > > Key: FLINK-8267 > URL: https://issues.apache.org/jira/browse/FLINK-8267 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Dyana Rose >Assignee: Bowen Li >Priority: Minor > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer > In the example code for the kinesis producer the region key is set like: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > {code} > However, the AWS Kinesis Producer Library requires that the region key be > Region > (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269) > so the setting at this point should be: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > producerConfig.put("Region", "us-east-1"); > {code} > When you run the Kinesis Producer you can see the effect of not setting the > Region key by a log line > {noformat} > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Started > Kinesis producer instance for region '' > {noformat} > The KPL also then assumes it's running on EC2 and attempts to determine it's > own region, which fails. > {noformat} > (EC2MetadataClient)Http request to Ec2MetadataService failed. > [error] [main.cc:266] Could not configure the region. It was not given in the > config and we were unable to retrieve it from EC2 metadata > {noformat} > At the least I'd say the documentation should mention the difference between > these two keys and when you are required to also set the Region key. > On the other hand, is this even the intended behaviour of this connector? Was > it intended that the AWSConfigConstants.AWS_REGION key also set the region of > the of the kinesis stream? The documentation for the example states > {noformat} > The example demonstrates producing a single Kinesis stream in the AWS region > “us-east-1”. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5301: [FLINK-8267] [Kinesis Connector] update Kinesis Pr...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5301 [FLINK-8267] [Kinesis Connector] update Kinesis Producer example for setting Region key ## What is the purpose of the change Update doc to guide users to use KPL's native keys to set regions. This originates from a bug that we forgot to set region keys explicitly for kinesis connector, which has been fixed. According to the previous discussion between @tzulitai and I, we want to remove AWSConfigConstants in 2.0 because it basically copies/translates config keys of KPL/KCL, which doesn't add much value. Guide users to use KPL's native keys to set regions can be the first step that facilitates the migration. ## Brief change log - updated doc ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: ## Documentation - Does this pull request introduce a new feature? (no) cc @tzulitai You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8267 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5301.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5301 commit b4034f67e727fef68740221e3b31cd131c905df1 Author: Bowen LiDate: 2018-01-02T19:21:28Z update local branch commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2 Author: Bowen Li Date: 2018-01-04T01:35:11Z remove sh commit e322a5416b0f4f89c366b98bb3571fbf6b7d460a Author: Bowen Li Date: 2017-12-17T06:18:55Z update doc commit 1b447633df4a8bfe7c4c19e7ae91aab6157756d7 Author: Bowen Li Date: 2018-01-15T23:48:38Z format code snippet commit 4ede4b5a89d4bfcda8dcc845ab1da42177d22358 Author: Bowen Li Date: 2018-01-15T23:49:37Z remove ';' from scala code ---
[jira] [Updated] (FLINK-8267) update Kinesis Producer example for setting Region key
[ https://issues.apache.org/jira/browse/FLINK-8267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8267: Summary: update Kinesis Producer example for setting Region key (was: Kinesis Producer example setting Region key) > update Kinesis Producer example for setting Region key > -- > > Key: FLINK-8267 > URL: https://issues.apache.org/jira/browse/FLINK-8267 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Dyana Rose >Assignee: Bowen Li >Priority: Minor > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kinesis.html#kinesis-producer > In the example code for the kinesis producer the region key is set like: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > {code} > However, the AWS Kinesis Producer Library requires that the region key be > Region > (https://github.com/awslabs/amazon-kinesis-producer/blob/94789ff4bb2f5dfa05aafe2d8437d3889293f264/java/amazon-kinesis-producer-sample/default_config.properties#L269) > so the setting at this point should be: > {code:java} > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); > producerConfig.put("Region", "us-east-1"); > {code} > When you run the Kinesis Producer you can see the effect of not setting the > Region key by a log line > {noformat} > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - Started > Kinesis producer instance for region '' > {noformat} > The KPL also then assumes it's running on EC2 and attempts to determine it's > own region, which fails. > {noformat} > (EC2MetadataClient)Http request to Ec2MetadataService failed. > [error] [main.cc:266] Could not configure the region. It was not given in the > config and we were unable to retrieve it from EC2 metadata > {noformat} > At the least I'd say the documentation should mention the difference between > these two keys and when you are required to also set the Region key. > On the other hand, is this even the intended behaviour of this connector? Was > it intended that the AWSConfigConstants.AWS_REGION key also set the region of > the of the kinesis stream? The documentation for the example states > {noformat} > The example demonstrates producing a single Kinesis stream in the AWS region > “us-east-1”. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8411: Summary: HeapListState#add(null) will wipe out entire list state (was: inconsistent behavior between HeapListState#add() and RocksDBListState#add()) > HeapListState#add(null) will wipe out entire list state > --- > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326633#comment-16326633 ] ASF GitHub Bot commented on FLINK-8411: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5300 [FLINK-8411] [State Backends] inconsistent behavior between HeapListState#add() and RocksDBListState#add() ## What is the purpose of the change `HeapListState#add(null)` will result in the whole state being cleared or wiped out. There's never a unit test for `List#add(null)` in `StateBackendTestBase` ## Brief change log - changed ListState impls such that `add(null)` will be explicitly ignored - added unit tests to test `add(null)` - updated javaDoc ## Verifying this change This change is already covered by existing tests, such as `StateBackendTestBase`. ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) Note! **This work depends on FLINK-7983 and its PR at https://github.com/apache/flink/pull/5281** cc @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8411 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5300 commit b4034f67e727fef68740221e3b31cd131c905df1 Author: Bowen LiDate: 2018-01-02T19:21:28Z update local branch commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2 Author: Bowen Li Date: 2018-01-04T01:35:11Z remove sh commit 72edff2a82df625203289e4b5be23db36b03abe3 Author: Bowen Li Date: 2018-01-12T08:14:55Z [FLINK-7938] introduce addAll() to ListState commit 138d6f63dff7a840b716bf8900f97940f7d61dd8 Author: Bowen Li Date: 2018-01-12T18:56:42Z add unit tests commit 481a5a98b7b655658855e18023c7c28328b0b47d Author: Bowen Li Date: 2018-01-12T19:03:07Z add documentation for addAll() commit cced3bac65660b27e17d258b3fd3880e9571bcf6 Author: Bowen Li Date: 2018-01-13T23:06:11Z add more cases in unit test commit 18fb3ff2965653dc0bd7c7a2d7a419ce8c7c6e8a Author: Bowen Li Date: 2018-01-15T23:24:19Z [FLINK-8411] inconsistent behavior between HeapListState#add() and RocksDBListState#add() > inconsistent behavior between HeapListState#add() and RocksDBListState#add() > > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTable map = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code}
[GitHub] flink pull request #5300: [FLINK-8411] [State Backends] inconsistent behavio...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/5300 [FLINK-8411] [State Backends] inconsistent behavior between HeapListState#add() and RocksDBListState#add() ## What is the purpose of the change `HeapListState#add(null)` will result in the whole state being cleared or wiped out. There's never a unit test for `List#add(null)` in `StateBackendTestBase` ## Brief change log - changed ListState impls such that `add(null)` will be explicitly ignored - added unit tests to test `add(null)` - updated javaDoc ## Verifying this change This change is already covered by existing tests, such as `StateBackendTestBase`. ## Does this pull request potentially affect one of the following parts: - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) Note! **This work depends on FLINK-7983 and its PR at https://github.com/apache/flink/pull/5281** cc @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-8411 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5300 commit b4034f67e727fef68740221e3b31cd131c905df1 Author: Bowen LiDate: 2018-01-02T19:21:28Z update local branch commit 11e9c255cd51304c5281d55226fbb6fe8360d8a2 Author: Bowen Li Date: 2018-01-04T01:35:11Z remove sh commit 72edff2a82df625203289e4b5be23db36b03abe3 Author: Bowen Li Date: 2018-01-12T08:14:55Z [FLINK-7938] introduce addAll() to ListState commit 138d6f63dff7a840b716bf8900f97940f7d61dd8 Author: Bowen Li Date: 2018-01-12T18:56:42Z add unit tests commit 481a5a98b7b655658855e18023c7c28328b0b47d Author: Bowen Li Date: 2018-01-12T19:03:07Z add documentation for addAll() commit cced3bac65660b27e17d258b3fd3880e9571bcf6 Author: Bowen Li Date: 2018-01-13T23:06:11Z add more cases in unit test commit 18fb3ff2965653dc0bd7c7a2d7a419ce8c7c6e8a Author: Bowen Li Date: 2018-01-15T23:24:19Z [FLINK-8411] inconsistent behavior between HeapListState#add() and RocksDBListState#add() ---
[jira] [Updated] (FLINK-8411) inconsistent behavior between HeapListState#add() and RocksDBListState#add()
[ https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-8411: Component/s: State Backends, Checkpointing > inconsistent behavior between HeapListState#add() and RocksDBListState#add() > > > Key: FLINK-8411 > URL: https://issues.apache.org/jira/browse/FLINK-8411 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > You can see that {{HeapListState#add(null)}} will result in the whole state > being cleared or wiped out. There's never a unit test for {{List#add(null)}} > in {{StateBackendTestBase}} > {code:java} > // HeapListState > @Override > public void add(V value) { > final N namespace = currentNamespace; > if (value == null) { > clear(); > return; > } > final StateTablemap = stateTable; > ArrayList list = map.get(namespace); > if (list == null) { > list = new ArrayList<>(); > map.put(namespace, list); > } > list.add(value); > } > {code} > {code:java} > // RocksDBListState > @Override > public void add(V value) throws IOException { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = keySerializationStream.toByteArray(); > keySerializationStream.reset(); > DataOutputViewStreamWrapper out = new > DataOutputViewStreamWrapper(keySerializationStream); > valueSerializer.serialize(value, out); > backend.db.merge(columnFamily, writeOptions, key, > keySerializationStream.toByteArray()); > } catch (Exception e) { > throw new RuntimeException("Error while adding data to > RocksDB", e); > } > } > {code} > The fix should correct the behavior to be consistent between the two state > backends, as well as adding a unit test for {{ListState#add(null)}}. For the > correct behavior, I believe adding null with {{add(null)}} should simply be > ignored without any consequences. > cc [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8188) Clean up flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-8188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-8188: --- Assignee: Bowen Li > Clean up flink-contrib > -- > > Key: FLINK-8188 > URL: https://issues.apache.org/jira/browse/FLINK-8188 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > > This is the umbrella ticket for cleaning up flink-contrib. > We argue that flink-contrib should be removed and all its submodules should > be migrated to other top-level modules for the following reasons: > 1) Apache Flink the whole project itself is a result of contributions from > many developers, there's no reason to highlight some contributions in a > dedicated module named 'contrib' > 2) flink-contrib is already too crowded and noisy. It contains lots of sub > modules with different purposes which confuse developers and users, and they > lack a proper project hierarchy > 3) This will save us quite some build time > More details in discussions at FLINK-8175 and FLINK-8167 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5299: [Hotfix][Doc][DataStream API] Fix Scala code examp...
GitHub user elbaulp opened a pull request: https://github.com/apache/flink/pull/5299 [Hotfix][Doc][DataStream API] Fix Scala code example in Controlling Latency section You can merge this pull request into a Git repository by running: $ git pull https://github.com/elbaulp/flink hotfix-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5299 commit 6bb3ced0828c9a3fdf5fc99d17dc8a573eb4163d Author: Alejandro AlcaldeDate: 2018-01-15T18:13:27Z Merge remote-tracking branch 'upstream/master' commit 7497d756f01aa9b596e11d64a6fa2a24702a53ce Author: Alejandro Alcalde Date: 2018-01-15T18:27:55Z [Hotfix][doc][DataStream API] Fix Scala code example in Controlling Latency section ---
[jira] [Commented] (FLINK-7938) support addAll() in ListState
[ https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326461#comment-16326461 ] ASF GitHub Bot commented on FLINK-7938: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5281 Hi guys, can @StefanRRichter @aljoscha or any committer else take a look at this PR? I'll host the Seattle Apache Flink Meetup this Wednesday and give a talk. I want to talk about the new APIs `ListState#update()` and `ListState#addAll()`. It will be great to get this merged in before then. Thanks! > support addAll() in ListState > - > > Key: FLINK-7938 > URL: https://issues.apache.org/jira/browse/FLINK-7938 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > support {{addAll()}} in {{ListState}}, so Flink can be more efficient in > adding elements to {{ListState}} in batch. This should give us a much better > performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5281: [FLINK-7938] support addAll() in ListState
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5281 Hi guys, can @StefanRRichter @aljoscha or any committer else take a look at this PR? I'll host the Seattle Apache Flink Meetup this Wednesday and give a talk. I want to talk about the new APIs `ListState#update()` and `ListState#addAll()`. It will be great to get this merged in before then. Thanks! ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326451#comment-16326451 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4552 one thing which we talked about offline: as a precaution, we should keep the old implementation around and allow the users to basically turn the credit-based flow control algorithm on/off (the accounting for the credits would mostly stay in that case but will simple not be used by the old non-existing flow control) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4552 one thing which we talked about offline: as a precaution, we should keep the old implementation around and allow the users to basically turn the credit-based flow control algorithm on/off (the accounting for the credits would mostly stay in that case but will simple not be used by the old non-existing flow control) ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326442#comment-16326442 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559926 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326430#comment-16326430 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161485403 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -434,6 +443,29 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) t UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } + /** +* Creates and returns a remote input channel for the specific input gate with specific partition request client. +* +* @param inputGate The input gate owns the created input channel. +* @param client The client is used to send partition request. +* @return The new created remote input channel. +*/ + private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient client) throws Exception { --- End diff -- could you modify `PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` to rely on this method, i.e. `return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326439#comment-16326439 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326435#comment-16326435 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559690 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161568012 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- we should test this everywhere we access `getNextBuffer()` or add buffers via `add()` - also if `getNextBuffer()` is `null` or before even requesting anything ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326448#comment-16326448 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161578518 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Is that also the reason here? I see that otherwise we get into `Insufficient number of network buffers` but it does not look as if it was configured as tightly... (just want to rule out some memory leak with the new code) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326443#comment-16326443 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161568012 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- we should test this everywhere we access `getNextBuffer()` or add buffers via `add()` - also if `getNextBuffer()` is `null` or before even requesting anything > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161576253 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- just a note for the curious: this test can cope with higher number of network buffers and is waiting for all of them to be blocked - increasing this to `9` would have been enough here though (we require 2 exclusive buffers now per default, while 1 was the minimum per incoming channel) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); --- End diff -- actually, let's use `assertThat(queue.getAvailableReaders(), contains(reader));` here which gives much nicer output in case something is wrong ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161570121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()` then? ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326433#comment-16326433 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326436#comment-16326436 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161547041 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even --- End diff -- nit: `is an event` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326440#comment-16326440 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559913 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); --- End diff -- let's remove that mock ... > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326437#comment-16326437 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); --- End diff -- actually, let's use `assertThat(queue.getAvailableReaders(), contains(reader));` here which gives much nicer output in case something is wrong > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567305 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); --- End diff -- let's end with `assertNull(channel.readOutbound());` ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326444#comment-16326444 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161569135 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + assertFalse(view.nextBufferIsEvent()); + read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); + // Add event to the queue... Buffer event = createBuffer(); event.tagAsEvent(); subpartition.add(event); + assertTrue(view.nextBufferIsEvent()); assertEquals(3, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } --- End diff -- maybe verify that `nextBufferIsEvent()` returns the right thing after adding a real buffer now (with the event being next) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326445#comment-16326445 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161573976 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); + assertFalse(reader.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- similar here: verify `PartitionRequest` after `inputChannel.requestSubpartition` ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326431#comment-16326431 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161545040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws Exception { handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse1); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse2); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel1); - handler.notifyCreditAvailable(inputChannel2); - assertEquals(2, inputChannel1.getUnannouncedCredit()); assertEquals(2, inputChannel2.getUnannouncedCredit()); channel.runPendingTasks(); - // The two input channels should notify credits via writable channel + // The two input channels should send partition requests and then notify credits via writable channel assertTrue(channel.isWritable()); Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel1.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel2.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- Let's verify those two `PartitionRequest` messages above since `inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if we change the `initialCredit` to be included in the `unannouncedCredit`). > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559926 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326447#comment-16326447 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161576253 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java --- @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16); --- End diff -- just a note for the curious: this test can cope with higher number of network buffers and is waiting for all of them to be blocked - increasing this to `9` would have been enough here though (we require 2 exclusive buffers now per default, while 1 was the minimum per incoming channel) > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326446#comment-16326446 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161570121 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result + assertFalse(view.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()` then? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326432#comment-16326432 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546199 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(CloseRequest.class)); --- End diff -- put these after `inputGate.releaseAllResources()`? > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326438#comment-16326438 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161578518 --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java --- @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024); --- End diff -- Is that also the reason here? I see that otherwise we get into `Insufficient number of network buffers` but it does not look as if it was configured as tightly... (just want to rule out some memory leak with the new code) ---
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326434#comment-16326434 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- similar here: verify `PartitionRequest` after `inputChannel.requestSubpartition` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326441#comment-16326441 ] ASF GitHub Bot commented on FLINK-7456: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567305 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); --- End diff -- let's end with `assertNull(channel.readOutbound());` > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161573976 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws Exception { verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); + assertFalse(reader.nextBufferIsEvent()); --- End diff -- also test `read.buffer().isBuffer()`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161569135 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + assertFalse(view.nextBufferIsEvent()); + read = view.getNextBuffer(); + assertNotNull(read); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog()); + assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); + // Add event to the queue... Buffer event = createBuffer(); event.tagAsEvent(); subpartition.add(event); + assertTrue(view.nextBufferIsEvent()); assertEquals(3, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getBuffersInBacklog()); assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(3)).notifyBuffersAvailable(eq(1L)); } --- End diff -- maybe verify that `nextBufferIsEvent()` returns the right thing after adding a real buffer now (with the event being next) ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161567331 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available buffers to trigger enqueue the reader + final int notifyNumBuffers = 5; + for (int i = 0; i < notifyNumBuffers; i++) { + reader.notifyBuffersAvailable(1); + } + +
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161547041 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even --- End diff -- nit: `is an event` ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559690 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161559913 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = spy(new PartitionRequestQueue()); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + verify(queue, times(1)).triggerEnqueueAvailableReader(reader); + // The reader is enqueued in the pipeline because the next buffer is event, even though no available credits + verify(queue, times(1)).enqueueAvailableReader(reader); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2)); --- End diff -- let's remove that mock ... ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161565896 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java --- @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception { NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg; assertTrue(err.cause instanceof CancelTaskException); } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even +* though it has no available credits. +*/ + @Test + public void testEnqueueReaderByNotifyingEventBuffer() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(true); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify an available event buffer to trigger enqueue the reader + reader.notifyBuffersAvailable(1); + + channel.runPendingTasks(); + + // The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available + assertEquals(1, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + + // Flush the buffer to make the channel writable again and see the final results + channel.flush(); + assertSame(channelBlockingBuffer, channel.readOutbound()); + + assertEquals(0, queue.getAvailableReaders().size()); + assertEquals(0, reader.getNumCreditsAvailable()); + } + + /** +* Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)}, +* verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers. +*/ + @Test + public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception { + // setup + final ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.nextBufferIsEvent()).thenReturn(false); + when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false)); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class); + when(partitionProvider.createSubpartitionView( + eq(partitionId), + eq(0), + any(BufferAvailabilityListener.class))).thenReturn(view); + + final InputChannelID receiverId = new InputChannelID(); + final PartitionRequestQueue queue = new PartitionRequestQueue(); + final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue); + final EmbeddedChannel channel = new EmbeddedChannel(queue); + + reader.requestSubpartitionView(partitionProvider, partitionId, 0); + queue.notifyReaderCreated(reader); + + // block the channel so that we see an intermediate state in the test + ByteBuf channelBlockingBuffer = blockChannel(channel); + assertNull(channel.readOutbound()); + + // Notify available buffers to trigger enqueue the reader + final int notifyNumBuffers = 5; + for (int i = 0; i < notifyNumBuffers; i++) { + reader.notifyBuffersAvailable(1); + } + +
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161546199 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { assertEquals(2, inputChannel.getUnannouncedCredit()); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel); - // Release the input channel inputGate.releaseAllResources(); channel.runPendingTasks(); - // It will not notify credits for released input channel + // It should send partition request first, and send close request after releasing input channel, + // but will not notify credits for released input channel. + Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(CloseRequest.class)); --- End diff -- put these after `inputGate.releaseAllResources()`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161485403 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -434,6 +443,29 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) t UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); } + /** +* Creates and returns a remote input channel for the specific input gate with specific partition request client. +* +* @param inputGate The input gate owns the created input channel. +* @param client The client is used to send partition request. +* @return The new created remote input channel. +*/ + private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient client) throws Exception { --- End diff -- could you modify `PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` to rely on this method, i.e. `return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));`? ---
[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4552#discussion_r161545040 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws Exception { handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse1); handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse2); - // The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we - // have to notify credit available in CreditBasedClientHandler explicitly - handler.notifyCreditAvailable(inputChannel1); - handler.notifyCreditAvailable(inputChannel2); - assertEquals(2, inputChannel1.getUnannouncedCredit()); assertEquals(2, inputChannel2.getUnannouncedCredit()); channel.runPendingTasks(); - // The two input channels should notify credits via writable channel + // The two input channels should send partition requests and then notify credits via writable channel assertTrue(channel.isWritable()); Object readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel1.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); + + readFromOutbound = channel.readOutbound(); + assertThat(readFromOutbound, instanceOf(PartitionRequest.class)); + assertEquals(inputChannel2.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId); + assertEquals(2, ((PartitionRequest) readFromOutbound).credit); --- End diff -- Let's verify those two `PartitionRequest` messages above since `inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if we change the `initialCredit` to be included in the `unannouncedCredit`). ---
[jira] [Assigned] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden
[ https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui reassigned FLINK-8407: -- Assignee: Xingcan Cui > Setting the parallelism after a partitioning operation should be forbidden > -- > > Key: FLINK-8407 > URL: https://issues.apache.org/jira/browse/FLINK-8407 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} > create new {{DataStreams}}, which allow the users to set parallelisms for > them. However, the {{PartitionTransformations}} in these returned > {{DataStreams}} will only add virtual nodes, whose parallelisms could not be > specified, in the execution graph. We should forbid users to set the > parallelism after a partitioning operation since they won't actually work. > Also the corresponding documents should be updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8316) The CsvTableSink and the CsvInputFormat are not in sync
[ https://issues.apache.org/jira/browse/FLINK-8316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326371#comment-16326371 ] ASF GitHub Bot commented on FLINK-8316: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5210 Hi @sunjincheng121, thanks for your reply. I think an example would be that, for some non-standard CSV files like `a, b , c,`, if the boolean flag `trailingDelimiter=false`, the file will be parsed with 4 fields; while if `trailingDelimiter=true`, the file will be parsed with 3 fields, in which the trailing delimiter `,` is omitted. Further, the trailing delimiter could be set as another character, e.g., `a, b, c;`. > The CsvTableSink and the CsvInputFormat are not in sync > --- > > Key: FLINK-8316 > URL: https://issues.apache.org/jira/browse/FLINK-8316 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Major > > As illustrated in [this > thread|https://lists.apache.org/thread.html/cfe3b1718a479300dc91d1523be023ef5bc702bd5ad53af4fea5a596@%3Cuser.flink.apache.org%3E], > the format for data generated in {{CsvTableSink}} is not compatible with > that accepted by {{CsvInputFormat}}. We should unify their trailing > delimiters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5210: [FLINK-8316] [table] The CsvTableSink and the CsvInputFor...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/5210 Hi @sunjincheng121, thanks for your reply. I think an example would be that, for some non-standard CSV files like `a, b , c,`, if the boolean flag `trailingDelimiter=false`, the file will be parsed with 4 fields; while if `trailingDelimiter=true`, the file will be parsed with 3 fields, in which the trailing delimiter `,` is omitted. Further, the trailing delimiter could be set as another character, e.g., `a, b, c;`. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/5623ac66bd145d52f3488ac2fff9dbc762d0bda1#commitcomment-26867793 @zentol @rmetzger I think this is not correct. The RocksDB state backend is in `lib` by default. This is only relevant for "running in the IDE". The text suggests you need to add this to your user jar. ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326332#comment-16326332 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 Here's a preliminary changelog: General: - rebase branch to current master - incremented version to 1.5-SNAPSHOT - fixed kafka-connector dependency declaration - set to provided - scala version set to scala.binary.version - flink version set to project.version - applied checkstyle - disabled method/parameter name rules for API classes - assigned flink-python-streaming to 'libraries' travis profile API: - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator - renamed PDS#print() to PDS#output() - print is a keyword in python and thus not usable in native python APIs - added PythonSingleOutputStreamOperator#name() - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods Moved/Renamed: - made SerializerMap top-level class and renamed it to AdapterMap - Moved UtilityFunctions#adapt to AdapterMap class - renamed UtilityFunctions to InterpreterUtils - moved PythonobjectInputStream2 to SerializationUtils - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream Functions: - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization - added generic return type to Serializationutils#deserializeObject - added new serializers for PyBoolean/-Float/-Integer/-Long/-String - PyObjectSerializer not properly fails when an exceptioin occurs - improved error printing - PythonCollector now typed to Object and properly converts non-PyObjects - jython functions that use a collector now have Object has output type - otherwise you would get ClassCastException if jython returns something that isn't a PyObject PythonStreamBinder - adjusted to follow PythonPlanBinder structure - client-like main() exception handling - replaced Random usage with UUID.randomUIID() - now loads GlobalConfiguration - local/distributed tmp dir now configurable - introduced PythonOptions - no longer generate plan.py but instead import it directly via the PythonInterpreter Environment: - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory - program main() method now accepts a PythonEnvironmentFactory - directories are now passed properly to the environment instead of using static fields - removed PythonEnvironmentConfig Tests: - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled - removed python TestBase class - removed print statements from tests - standardized test job names - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest - run_all_tests improvements - stop after first failure - print stacktrace on failure - no longer relies on dirname() to get cwd but uses the module file location instead > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Major > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 Here's a preliminary changelog: General: - rebase branch to current master - incremented version to 1.5-SNAPSHOT - fixed kafka-connector dependency declaration - set to provided - scala version set to scala.binary.version - flink version set to project.version - applied checkstyle - disabled method/parameter name rules for API classes - assigned flink-python-streaming to 'libraries' travis profile API: - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator - renamed PDS#print() to PDS#output() - print is a keyword in python and thus not usable in native python APIs - added PythonSingleOutputStreamOperator#name() - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods Moved/Renamed: - made SerializerMap top-level class and renamed it to AdapterMap - Moved UtilityFunctions#adapt to AdapterMap class - renamed UtilityFunctions to InterpreterUtils - moved PythonobjectInputStream2 to SerializationUtils - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream Functions: - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization - added generic return type to Serializationutils#deserializeObject - added new serializers for PyBoolean/-Float/-Integer/-Long/-String - PyObjectSerializer not properly fails when an exceptioin occurs - improved error printing - PythonCollector now typed to Object and properly converts non-PyObjects - jython functions that use a collector now have Object has output type - otherwise you would get ClassCastException if jython returns something that isn't a PyObject PythonStreamBinder - adjusted to follow PythonPlanBinder structure - client-like main() exception handling - replaced Random usage with UUID.randomUIID() - now loads GlobalConfiguration - local/distributed tmp dir now configurable - introduced PythonOptions - no longer generate plan.py but instead import it directly via the PythonInterpreter Environment: - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory - program main() method now accepts a PythonEnvironmentFactory - directories are now passed properly to the environment instead of using static fields - removed PythonEnvironmentConfig Tests: - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled - removed python TestBase class - removed print statements from tests - standardized test job names - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest - run_all_tests improvements - stop after first failure - print stacktrace on failure - no longer relies on dirname() to get cwd but uses the module file location instead ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326327#comment-16326327 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 I've been digging into this for the past week. I found a number of things to improve and did so in a local branch. Once I've finalized/tests things (probably tomorrow) I'll link the branch here or open another PR. > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi >Priority: Major > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 I've been digging into this for the past week. I found a number of things to improve and did so in a local branch. Once I've finalized/tests things (probably tomorrow) I'll link the branch here or open another PR. ---
[jira] [Commented] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326323#comment-16326323 ] Till Rohrmann commented on FLINK-7949: -- Fixed in 1.4.1 via 7d040fd40c2816e829c81cb38177b6e1579c761c 9014167987cfcd108e7316281d562b7d85c12fba > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Assignee: Bartłomiej Tartanus >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7949) AsyncWaitOperator is not restarting when queue is full
[ https://issues.apache.org/jira/browse/FLINK-7949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7949: - Fix Version/s: 1.4.1 > AsyncWaitOperator is not restarting when queue is full > -- > > Key: FLINK-7949 > URL: https://issues.apache.org/jira/browse/FLINK-7949 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Bartłomiej Tartanus >Assignee: Bartłomiej Tartanus >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > Original Estimate: 0.25h > Remaining Estimate: 0.25h > > Issue was describe here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-was-declined-tasks-not-ready-td16066.html > Issue - AsyncWaitOperator can't restart properly after failure (thread is > waiting forever) > Scenario to reproduce this issue: > 1. The queue is full (let's assume that its capacity is N elements) > 2. There is some pending element waiting, so the > pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and > while-loop in addAsyncBufferEntry method is trying to add this element to > the queue (but element is not added because queue is full) > 3. Now the snapshot is taken - the whole queue of N elements is being > written into the ListState in snapshotState method and also (what is more > important) this pendingStreamElementQueueEntry is written to this list too. > 4. The process is being restarted, so it tries to recover all the elements > and put them again into the queue, but the list of recovered elements hold > N+1 element and our queue capacity is only N. Process is not started yet, so > it can not process any element and this one element is waiting endlessly. > But it's never added and the process will never process anything. Deadlock. > 5. Trigger is fired and indeed discarded because the process is not running > yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326322#comment-16326322 ] ASF GitHub Bot commented on FLINK-7738: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4767 Hi @EronWright, yes I think we still need support for web sockets. The first REST based client won't use this but later on we should definitely add this functionality. At the moment we try hard to make Flip-6 feature equivalent to the old distributed architecture and therefore we couldn't make progress here. But once this has been done, we should re-iterate over this PR again. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4767 Hi @EronWright, yes I think we still need support for web sockets. The first REST based client won't use this but later on we should definitely add this functionality. At the moment we try hard to make Flip-6 feature equivalent to the old distributed architecture and therefore we couldn't make progress here. But once this has been done, we should re-iterate over this PR again. ---
[jira] [Commented] (FLINK-8082) Bump version compatibility check to 1.4
[ https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326318#comment-16326318 ] ASF GitHub Bot commented on FLINK-8082: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5262 > Bump version compatibility check to 1.4 > --- > > Key: FLINK-8082 > URL: https://issues.apache.org/jira/browse/FLINK-8082 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > Similar to FLINK-7977, we must bump the version of the compatibility check to > compare 1.5 against 1.4, once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5289: [hotfix] [docs] Fix typos
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5289 ---
[GitHub] flink pull request #5262: [FLINK-8082][build] Bump flink version for japicmp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5262 ---
[GitHub] flink pull request #5293: [hotfix][docs] Mention maven dependency for RocksD...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5293 ---
[GitHub] flink pull request #5133: [hotfix] Fix typo in AkkaUtils method
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5133 ---
[jira] [Closed] (FLINK-8082) Bump version compatibility check to 1.4
[ https://issues.apache.org/jira/browse/FLINK-8082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8082. --- Resolution: Fixed master: 4e0ca93d3f496b410452ab31485ed920e9ab2702 > Bump version compatibility check to 1.4 > --- > > Key: FLINK-8082 > URL: https://issues.apache.org/jira/browse/FLINK-8082 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > Similar to FLINK-7977, we must bump the version of the compatibility check to > compare 1.5 against 1.4, once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
[ https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326315#comment-16326315 ] ASF GitHub Bot commented on FLINK-8399: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5271 > Use independent configurations for the different timeouts in slot manager > - > > Key: FLINK-8399 > URL: https://issues.apache.org/jira/browse/FLINK-8399 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > There are three parameter in slot manager to indicate the timeout for slot > request to task manager, slot request to be discarded and task manager to be > released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, > need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5271: [FLINK-8399] [runtime] use independent configurati...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5271 ---
[jira] [Resolved] (FLINK-8399) Use independent configurations for the different timeouts in slot manager
[ https://issues.apache.org/jira/browse/FLINK-8399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8399. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via b4599156415f2ad1eee58ffce9a5e9fa54bbdd4e > Use independent configurations for the different timeouts in slot manager > - > > Key: FLINK-8399 > URL: https://issues.apache.org/jira/browse/FLINK-8399 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > There are three parameter in slot manager to indicate the timeout for slot > request to task manager, slot request to be discarded and task manager to be > released. But now they all come from the value of AkkaOptions.ASK_TIMEOUT, > need to use independent configurations for them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once
[ https://issues.apache.org/jira/browse/FLINK-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8435: Environment: (was: Many times doing something approximately is enough for users (Such as counting). I think we should implement bloom filter and HyperLogLog as state for keyed state. By this users are able to filter or process many things approximately.) > Adding BloomFilter/HyperLogLog state as Managed once > > > Key: FLINK-8435 > URL: https://issues.apache.org/jira/browse/FLINK-8435 > Project: Flink > Issue Type: New Feature >Reporter: Moein Hosseini >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once
[ https://issues.apache.org/jira/browse/FLINK-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-8435: Description: Many times doing something approximately is enough for users (Such as counting). I think we should implement bloom filter and HyperLogLog as state for keyed state. By this users are able to filter or process many things approximately. > Adding BloomFilter/HyperLogLog state as Managed once > > > Key: FLINK-8435 > URL: https://issues.apache.org/jira/browse/FLINK-8435 > Project: Flink > Issue Type: New Feature >Reporter: Moein Hosseini >Priority: Major > > Many times doing something approximately is enough for users (Such as > counting). I think we should implement bloom filter and HyperLogLog as state > for keyed state. By this users are able to filter or process many things > approximately. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once
Moein Hosseini created FLINK-8435: - Summary: Adding BloomFilter/HyperLogLog state as Managed once Key: FLINK-8435 URL: https://issues.apache.org/jira/browse/FLINK-8435 Project: Flink Issue Type: New Feature Environment: Many times doing something approximately is enough for users (Such as counting). I think we should implement bloom filter and HyperLogLog as state for keyed state. By this users are able to filter or process many things approximately. Reporter: Moein Hosseini -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8414) Gelly performance seriously decreases when using the suggested parallelism configuration
[ https://issues.apache.org/jira/browse/FLINK-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326285#comment-16326285 ] Greg Hogan commented on FLINK-8414: --- You certainly can measure scalability but as you have discovered the performance will not be monotonically increasing. Redistributing operators require a channel between each pair of tasks, so with a parallelism of 2^7 you will have 2^14 channels between each task for each iteration. There are many reasons to use Flink and Gelly, but for some use cases for certain algorithms you may even get better performance with a single-threaded implementation. See "Scalability! But at what COST?". ConnectedComponents and PageRank require, respectively, no and very little intermediate data, whereas the similarity measures JaccardIndex and AdamicAdar as well as triangle metrics such as ClusteringCoefficient process super-linear intermediate data and benefit much more from Flink's scalability. When comparing against non-distributed implementations it is important to note that all Gelly algorithms process generic data, whereas many "optimized" algorithms assume compact integer representations. > Gelly performance seriously decreases when using the suggested parallelism > configuration > > > Key: FLINK-8414 > URL: https://issues.apache.org/jira/browse/FLINK-8414 > Project: Flink > Issue Type: Bug > Components: Configuration, Documentation, Gelly >Reporter: flora karniav >Priority: Minor > > I am running Gelly examples with different datasets in a cluster of 5 > machines (1 Jobmanager and 4 Taskmanagers) of 32 cores each. > The number of Slots parameter is set to 32 (as suggested) and the parallelism > to 128 (32 cores*4 taskmanagers). > I observe a vast performance degradation using these suggested settings than > setting parallelism.default to 16 for example were the same job completes at > ~60 seconds vs ~140 in the 128 parallelism case. > Is there something wrong in my configuration? Should I decrease parallelism > and -if so- will this inevitably decrease CPU utilization? > Another matter that may be related to this is the number of partitions of the > data. Is this somehow related to parallelism? How many partitions are created > in the case of parallelism.default=128? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326275#comment-16326275 ] ASF GitHub Bot commented on FLINK-7456: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , I have submitted all the modifications based on the patches you provided. The tests for `nextBufferIsEvent` will be added in a new commit tomorrow. > Implement Netty sender incoming pipeline for credit-based > - > > Key: FLINK-7456 > URL: https://issues.apache.org/jira/browse/FLINK-7456 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > On sender side, each subpartition view maintains an atomic integer > {{currentCredit}} from receiver. Once receiving the messages of > {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by > deltas. > Each view also maintains an atomic boolean field to mark it as registered > available for transfer to make sure it is enqueued in handler only once. If > the {{currentCredit}} increases from zero and there are available buffers in > the subpartition, the corresponding view will be enqueued for transferring > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4552 @NicoK , I have submitted all the modifications based on the patches you provided. The tests for `nextBufferIsEvent` will be added in a new commit tomorrow. ---
[jira] [Resolved] (FLINK-6730) Activate strict checkstyle for flink-optimizer
[ https://issues.apache.org/jira/browse/FLINK-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan resolved FLINK-6730. --- Resolution: Won't Fix Closing per comment from #5294: "I would ignore the optimizer module TBH. There are no recent contributions to the module, so we don't benefit from cleaner PR. I also don't know what will happen to the module when we start unifying the batch APIs, which probably will be either be a full rewrite or removal." > Activate strict checkstyle for flink-optimizer > -- > > Key: FLINK-6730 > URL: https://issues.apache.org/jira/browse/FLINK-6730 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > Long term issue for incrementally introducing the strict checkstyle to > flink-optimizer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8427) Checkstyle for org.apache.flink.optimizer.costs
[ https://issues.apache.org/jira/browse/FLINK-8427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326243#comment-16326243 ] ASF GitHub Bot commented on FLINK-8427: --- Github user greghogan closed the pull request at: https://github.com/apache/flink/pull/5294 > Checkstyle for org.apache.flink.optimizer.costs > --- > > Key: FLINK-8427 > URL: https://issues.apache.org/jira/browse/FLINK-8427 > Project: Flink > Issue Type: Improvement > Components: Optimizer >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > -- This message was sent by Atlassian JIRA (v7.6.3#76005)