[FLINK-7661][network] Add credit field in PartitionRequest message

This closes #4698.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891f359d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891f359d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891f359d

Branch: refs/heads/master
Commit: 891f359d710146acf3d05cd2af3bb430a8fbc99b
Parents: eef0db0
Author: Zhijiang <[email protected]>
Authored: Thu Sep 21 16:28:16 2017 +0800
Committer: zentol <[email protected]>
Committed: Wed Oct 11 22:06:58 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/netty/NettyMessage.java     | 11 ++++++++---
 .../runtime/io/network/netty/PartitionRequestClient.java |  2 +-
 .../network/partition/consumer/RemoteInputChannel.java   |  4 ++++
 .../io/network/netty/CancelPartitionRequestTest.java     |  4 ++--
 .../io/network/netty/NettyMessageSerializationTest.java  |  3 ++-
 .../network/netty/ServerTransportErrorHandlingTest.java  |  2 +-
 6 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index d7ddfa6..c035010 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -358,10 +358,13 @@ abstract class NettyMessage {
 
                final InputChannelID receiverId;
 
-               PartitionRequest(ResultPartitionID partitionId, int queueIndex, 
InputChannelID receiverId) {
+               final int credit;
+
+               PartitionRequest(ResultPartitionID partitionId, int queueIndex, 
InputChannelID receiverId, int credit) {
                        this.partitionId = checkNotNull(partitionId);
                        this.queueIndex = queueIndex;
                        this.receiverId = checkNotNull(receiverId);
+                       this.credit = credit;
                }
 
                @Override
@@ -369,12 +372,13 @@ abstract class NettyMessage {
                        ByteBuf result = null;
 
                        try {
-                               result = allocateBuffer(allocator, ID, 16 + 16 
+ 4 + 16);
+                               result = allocateBuffer(allocator, ID, 16 + 16 
+ 4 + 16 + 4);
 
                                partitionId.getPartitionId().writeTo(result);
                                partitionId.getProducerId().writeTo(result);
                                result.writeInt(queueIndex);
                                receiverId.writeTo(result);
+                               result.writeInt(credit);
 
                                return result;
                        }
@@ -394,8 +398,9 @@ abstract class NettyMessage {
                                        ExecutionAttemptID.fromByteBuf(buffer));
                        int queueIndex = buffer.readInt();
                        InputChannelID receiverId = 
InputChannelID.fromByteBuf(buffer);
+                       int credit = buffer.readInt();
 
-                       return new PartitionRequest(partitionId, queueIndex, 
receiverId);
+                       return new PartitionRequest(partitionId, queueIndex, 
receiverId, credit);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 7850974..8dbc6b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -106,7 +106,7 @@ public class PartitionRequestClient {
                partitionRequestHandler.addInputChannel(inputChannel);
 
                final PartitionRequest request = new PartitionRequest(
-                               partitionId, subpartitionIndex, 
inputChannel.getInputChannelId());
+                               partitionId, subpartitionIndex, 
inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
 
                final ChannelFutureListener listener = new 
ChannelFutureListener() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1eaef..4c156df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -382,6 +382,10 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                return id;
        }
 
+       public int getInitialCredit() {
+               return initialCredit;
+       }
+
        public BufferProvider getBufferProvider() throws IOException {
                if (isReleased.get()) {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 12f5064..912fae2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -97,7 +97,7 @@ public class CancelPartitionRequestTest {
                        Channel ch = connect(serverAndClient);
 
                        // Request for non-existing input channel => results in 
cancel request
-                       ch.writeAndFlush(new PartitionRequest(pid, 0, new 
InputChannelID())).await();
+                       ch.writeAndFlush(new PartitionRequest(pid, 0, new 
InputChannelID(), 2)).await();
 
                        // Wait for the notification
                        if 
(!sync.await(TestingUtils.TESTING_DURATION().toMillis(), 
TimeUnit.MILLISECONDS)) {
@@ -150,7 +150,7 @@ public class CancelPartitionRequestTest {
                        // Request for non-existing input channel => results in 
cancel request
                        InputChannelID inputChannelId = new InputChannelID();
 
-                       ch.writeAndFlush(new PartitionRequest(pid, 0, 
inputChannelId)).await();
+                       ch.writeAndFlush(new PartitionRequest(pid, 0, 
inputChannelId, 2)).await();
 
                        // Wait for the notification
                        if 
(!sync.await(TestingUtils.TESTING_DURATION().toMillis(), 
TimeUnit.MILLISECONDS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 8200caa..0651f97 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -126,12 +126,13 @@ public class NettyMessageSerializationTest {
                }
 
                {
-                       NettyMessage.PartitionRequest expected = new 
NettyMessage.PartitionRequest(new ResultPartitionID(new 
IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), 
new InputChannelID());
+                       NettyMessage.PartitionRequest expected = new 
NettyMessage.PartitionRequest(new ResultPartitionID(new 
IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), 
new InputChannelID(), random.nextInt());
                        NettyMessage.PartitionRequest actual = 
encodeAndDecode(expected);
 
                        assertEquals(expected.partitionId, actual.partitionId);
                        assertEquals(expected.queueIndex, actual.queueIndex);
                        assertEquals(expected.receiverId, actual.receiverId);
+                       assertEquals(expected.credit, actual.credit);
                }
 
                {

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 01a0b5f..d365fba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -106,7 +106,7 @@ public class ServerTransportErrorHandlingTest {
                        Channel ch = connect(serverAndClient);
 
                        // Write something to trigger close by server
-                       ch.writeAndFlush(new NettyMessage.PartitionRequest(new 
ResultPartitionID(), 0, new InputChannelID()));
+                       ch.writeAndFlush(new NettyMessage.PartitionRequest(new 
ResultPartitionID(), 0, new InputChannelID(), 2));
 
                        // Wait for the notification
                        if 
(!sync.await(TestingUtils.TESTING_DURATION().toMillis(), 
TimeUnit.MILLISECONDS)) {

Reply via email to