[FLINK-7661][network] Add credit field in PartitionRequest message This closes #4698.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891f359d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891f359d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891f359d Branch: refs/heads/master Commit: 891f359d710146acf3d05cd2af3bb430a8fbc99b Parents: eef0db0 Author: Zhijiang <[email protected]> Authored: Thu Sep 21 16:28:16 2017 +0800 Committer: zentol <[email protected]> Committed: Wed Oct 11 22:06:58 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/io/network/netty/NettyMessage.java | 11 ++++++++--- .../runtime/io/network/netty/PartitionRequestClient.java | 2 +- .../network/partition/consumer/RemoteInputChannel.java | 4 ++++ .../io/network/netty/CancelPartitionRequestTest.java | 4 ++-- .../io/network/netty/NettyMessageSerializationTest.java | 3 ++- .../network/netty/ServerTransportErrorHandlingTest.java | 2 +- 6 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java index d7ddfa6..c035010 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java @@ -358,10 +358,13 @@ abstract class NettyMessage { final InputChannelID receiverId; - PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) { + final int credit; + + PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId, int credit) { this.partitionId = checkNotNull(partitionId); this.queueIndex = queueIndex; this.receiverId = checkNotNull(receiverId); + this.credit = credit; } @Override @@ -369,12 +372,13 @@ abstract class NettyMessage { ByteBuf result = null; try { - result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16); + result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16 + 4); partitionId.getPartitionId().writeTo(result); partitionId.getProducerId().writeTo(result); result.writeInt(queueIndex); receiverId.writeTo(result); + result.writeInt(credit); return result; } @@ -394,8 +398,9 @@ abstract class NettyMessage { ExecutionAttemptID.fromByteBuf(buffer)); int queueIndex = buffer.readInt(); InputChannelID receiverId = InputChannelID.fromByteBuf(buffer); + int credit = buffer.readInt(); - return new PartitionRequest(partitionId, queueIndex, receiverId); + return new PartitionRequest(partitionId, queueIndex, receiverId, credit); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 7850974..8dbc6b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -106,7 +106,7 @@ public class PartitionRequestClient { partitionRequestHandler.addInputChannel(inputChannel); final PartitionRequest request = new PartitionRequest( - partitionId, subpartitionIndex, inputChannel.getInputChannelId()); + partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit()); final ChannelFutureListener listener = new ChannelFutureListener() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 4e1eaef..4c156df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -382,6 +382,10 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, return id; } + public int getInitialCredit() { + return initialCredit; + } + public BufferProvider getBufferProvider() throws IOException { if (isReleased.get()) { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 12f5064..912fae2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -97,7 +97,7 @@ public class CancelPartitionRequestTest { Channel ch = connect(serverAndClient); // Request for non-existing input channel => results in cancel request - ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID())).await(); + ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID(), 2)).await(); // Wait for the notification if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) { @@ -150,7 +150,7 @@ public class CancelPartitionRequestTest { // Request for non-existing input channel => results in cancel request InputChannelID inputChannelId = new InputChannelID(); - ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId)).await(); + ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId, 2)).await(); // Wait for the notification if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) { http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java index 8200caa..0651f97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java @@ -126,12 +126,13 @@ public class NettyMessageSerializationTest { } { - NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID()); + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID(), random.nextInt()); NettyMessage.PartitionRequest actual = encodeAndDecode(expected); assertEquals(expected.partitionId, actual.partitionId); assertEquals(expected.queueIndex, actual.queueIndex); assertEquals(expected.receiverId, actual.receiverId); + assertEquals(expected.credit, actual.credit); } { http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index 01a0b5f..d365fba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -106,7 +106,7 @@ public class ServerTransportErrorHandlingTest { Channel ch = connect(serverAndClient); // Write something to trigger close by server - ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID())); + ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID(), 2)); // Wait for the notification if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
