[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390719053 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -443,10 +587,13 @@ private static BufferResponse createBufferResponse( // Skip general header bytes serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH); - // Deserialize the bytes again. We have to go this way, because we only partly deserialize - // the header of the response and wait for a buffer from the buffer pool to copy the payload - // data into. - BufferResponse deserialized = BufferResponse.readFrom(serialized); + // Deserialize the bytes again. We have to go this way to ensure the data buffer part + // is consistent with the input channel sent to. Review comment: I think the previous comments is trying to explain why it needs to deserialize the bytes again: since it need to reach the state that the message is deserialized but not copied to a Flink Buffer yet. I thought that the comment might not be consistent with current implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390717265 ## File path: flink-end-to-end-tests/test-scripts/test_taskmanager_direct_memory.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST=flink-taskmanager-direct-memory-test +TEST_PROGRAM_NAME=TaskManagerDirectMemoryTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_config_key "akka.ask.timeout" "60 s" +set_config_key "web.timeout" "6" + +set_config_key "taskmanager.memory.process.size" "1536m" + +set_config_key "taskmanager.memory.managed.size" "8" # 8Mb +set_config_key "taskmanager.memory.network.min" "256mb" +set_config_key "taskmanager.memory.network.max" "256mb" +set_config_key "taskmanager.memory.jvm-metaspace.size" "64m" + +set_config_key "taskmanager.numberOfTaskSlots" "20" # 20 slots per TM +set_config_key "taskmanager.network.netty.num-arenas" "1" # Use only one arena for each TM Review comment: Fixed the comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390707610 ## File path: flink-end-to-end-tests/flink-taskmanager-direct-memory-test/src/main/java/org/apache/flink/streaming/tests/TaskManagerDirectMemoryTestProgram.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Test program for taskmanager direct memory consumption. Review comment: Modified the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390477408 ## File path: flink-end-to-end-tests/test-scripts/test_taskmanager_direct_memory.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST=flink-taskmanager-direct-memory-test +TEST_PROGRAM_NAME=TaskManagerDirectMemoryTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_config_key "akka.ask.timeout" "60 s" +set_config_key "web.timeout" "6" + +set_config_key "taskmanager.memory.process.size" "1536m" Review comment: Change the change to `taskmanager.memory.flink.size: 512m`. If we configure the process memory to 512M, there will be warnings on insufficient memory since there is also native consumption. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390700889 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java ## @@ -162,6 +168,37 @@ static NettyConfig createConfig(int segmentSize, Configuration config) throws Ex config); } + // - + // Encoding & Decoding + // - + + @SuppressWarnings("unchecked") Review comment: Removed the warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390477408 ## File path: flink-end-to-end-tests/test-scripts/test_taskmanager_direct_memory.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST=flink-taskmanager-direct-memory-test +TEST_PROGRAM_NAME=TaskManagerDirectMemoryTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_config_key "akka.ask.timeout" "60 s" +set_config_key "web.timeout" "6" + +set_config_key "taskmanager.memory.process.size" "1536m" Review comment: Change the change to `taskmanager.memory.flink.size: 512m`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390477073 ## File path: flink-end-to-end-tests/test-scripts/test_taskmanager_direct_memory.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST=flink-taskmanager-direct-memory-test +TEST_PROGRAM_NAME=TaskManagerDirectMemoryTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_config_key "akka.ask.timeout" "60 s" +set_config_key "web.timeout" "6" + +set_config_key "taskmanager.memory.process.size" "1536m" + +set_config_key "taskmanager.memory.managed.size" "8" # 8Mb +set_config_key "taskmanager.memory.network.min" "256mb" +set_config_key "taskmanager.memory.network.max" "256mb" +set_config_key "taskmanager.memory.jvm-metaspace.size" "64m" Review comment: The default network memory (64M) is not enough, decrease the network memory into 128m and remove other configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390474815 ## File path: flink-end-to-end-tests/flink-taskmanager-direct-memory-test/src/main/java/org/apache/flink/streaming/tests/TaskManagerDirectMemoryTestProgram.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Test program for taskmanager direct memory consumption. + */ +public class TaskManagerDirectMemoryTestProgram { + private static final ConfigOption RUNNING_TIME_IN_SECONDS = ConfigOptions + .key("test.running_time_in_seconds") + .defaultValue(120) + .withDescription("The time to run."); + + private static final ConfigOption RECORD_LENGTH = ConfigOptions + .key("test.record_length") + .defaultValue(2048) + .withDescription("The length of record."); + + private static final ConfigOption MAP_PARALLELISM = ConfigOptions + .key("test.map_parallelism") + .defaultValue(1) + .withDescription("The number of map tasks."); + + private static final ConfigOption REDUCE_PARALLELISM = ConfigOptions + .key("test.reduce_parallelism") + .defaultValue(1) + .withDescription("The number of reduce tasks."); + + public static void main(String[] args) throws Exception { + // parse the parameters + final ParameterTool params = ParameterTool.fromArgs(args); + + final int runningTimeInSeconds = params.getInt(RUNNING_TIME_IN_SECONDS.key(), RUNNING_TIME_IN_SECONDS.defaultValue()); + final int recordLength = params.getInt(RECORD_LENGTH.key(), RECORD_LENGTH.defaultValue()); + final int mapParallelism = params.getInt(MAP_PARALLELISM.key(), MAP_PARALLELISM.defaultValue()); + final int reduceParallelism = params.getInt(REDUCE_PARALLELISM.key(), REDUCE_PARALLELISM.defaultValue()); + + checkArgument(runningTimeInSeconds > 0, + "The running time in seconds should be positive, but it is {}", + recordLength); + checkArgument(recordLength > 0, + "The record length should be positive, but it is {}", + recordLength); + checkArgument(mapParallelism > 0, + "The number of map tasks should be positive, but it is {}", + mapParallelism); + checkArgument(reduceParallelism > 0, + "The number of reduce tasks should be positve, but it is {}", + reduceParallelism); + + byte[] bytes = new byte[recordLength]; + for (int i = 0; i < recordLength; ++i) { + bytes[i] = 'a'; + } + String str = new String(bytes); Review comment: Moved the stream generation into source function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390474969 ## File path: flink-end-to-end-tests/test-scripts/test_taskmanager_direct_memory.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST=flink-taskmanager-direct-memory-test +TEST_PROGRAM_NAME=TaskManagerDirectMemoryTestProgram +TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar + +set_config_key "akka.ask.timeout" "60 s" +set_config_key "web.timeout" "6" Review comment: Removed the configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390474683 ## File path: flink-end-to-end-tests/flink-taskmanager-direct-memory-test/src/main/java/org/apache/flink/streaming/tests/TaskManagerDirectMemoryTestProgram.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Test program for taskmanager direct memory consumption. + */ +public class TaskManagerDirectMemoryTestProgram { + private static final ConfigOption RUNNING_TIME_IN_SECONDS = ConfigOptions + .key("test.running_time_in_seconds") + .defaultValue(120) + .withDescription("The time to run."); + + private static final ConfigOption RECORD_LENGTH = ConfigOptions + .key("test.record_length") + .defaultValue(2048) + .withDescription("The length of record."); + + private static final ConfigOption MAP_PARALLELISM = ConfigOptions + .key("test.map_parallelism") + .defaultValue(1) + .withDescription("The number of map tasks."); + + private static final ConfigOption REDUCE_PARALLELISM = ConfigOptions + .key("test.reduce_parallelism") + .defaultValue(1) + .withDescription("The number of reduce tasks."); + + public static void main(String[] args) throws Exception { + // parse the parameters + final ParameterTool params = ParameterTool.fromArgs(args); + + final int runningTimeInSeconds = params.getInt(RUNNING_TIME_IN_SECONDS.key(), RUNNING_TIME_IN_SECONDS.defaultValue()); + final int recordLength = params.getInt(RECORD_LENGTH.key(), RECORD_LENGTH.defaultValue()); + final int mapParallelism = params.getInt(MAP_PARALLELISM.key(), MAP_PARALLELISM.defaultValue()); + final int reduceParallelism = params.getInt(REDUCE_PARALLELISM.key(), REDUCE_PARALLELISM.defaultValue()); Review comment: Change the `recordLength` variable into constant This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390472318 ## File path: flink-end-to-end-tests/pom.xml ## @@ -88,6 +88,7 @@ under the License. flink-elasticsearch7-test flink-end-to-end-tests-common-kafka flink-tpcds-test + flink-taskmanager-direct-memory-test Review comment: Changed the module name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471930 ## File path: flink-end-to-end-tests/flink-taskmanager-direct-memory-test/src/main/java/org/apache/flink/streaming/tests/TaskManagerDirectMemoryTestProgram.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Test program for taskmanager direct memory consumption. + */ +public class TaskManagerDirectMemoryTestProgram { Review comment: Renamed the program. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471662 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.TestingPartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from server side to client side. + */ +@RunWith(Parameterized.class) +public class NettyMessageClientSideSerializationTest { + + private static final int BUFFER_SIZE = 1024; + + private static final BufferCompressor COMPRESSOR = new BufferCompressor(BUFFER_SIZE, "LZ4"); + + private static final BufferDecompressor DECOMPRESSOR = new BufferDecompressor(BUFFER_SIZE, "LZ4"); + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + private NetworkBufferPool networkBufferPool; + + private SingleInputGate inputGate; + + private InputChannelID inputChannelId; + + // + // parameters + // + + private final boolean testReadOnlyBuffer; + + private final boolean testCompressedBuffer; + + @Parameterized.Parameters(name = "testReadOnlyBuffer = {0}, testCompressedBuffer = {1}") + public static Collection testReadOnlyBuffer() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + this.testReadOnlyBuffer = testReadOnlyBuffer; + this.testCompressedBuffer = testCompressedBuffer; + } + + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new Network
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471734 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.TestingPartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from server side to client side. + */ +@RunWith(Parameterized.class) +public class NettyMessageClientSideSerializationTest { + + private static final int BUFFER_SIZE = 1024; + + private static final BufferCompressor COMPRESSOR = new BufferCompressor(BUFFER_SIZE, "LZ4"); + + private static final BufferDecompressor DECOMPRESSOR = new BufferDecompressor(BUFFER_SIZE, "LZ4"); + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + private NetworkBufferPool networkBufferPool; + + private SingleInputGate inputGate; + + private InputChannelID inputChannelId; + + // + // parameters + // + + private final boolean testReadOnlyBuffer; + + private final boolean testCompressedBuffer; + + @Parameterized.Parameters(name = "testReadOnlyBuffer = {0}, testCompressedBuffer = {1}") + public static Collection testReadOnlyBuffer() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + this.testReadOnlyBuffer = testReadOnlyBuffer; + this.testCompressedBuffer = testCompressedBuffer; + } + + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new Network
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471418 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.TestingPartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; + +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from server side to client side. + */ +@RunWith(Parameterized.class) +public class NettyMessageClientSideSerializationTest { + + private static final int BUFFER_SIZE = 1024; + + private static final BufferCompressor COMPRESSOR = new BufferCompressor(BUFFER_SIZE, "LZ4"); + + private static final BufferDecompressor DECOMPRESSOR = new BufferDecompressor(BUFFER_SIZE, "LZ4"); + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + private NetworkBufferPool networkBufferPool; + + private SingleInputGate inputGate; + + private InputChannelID inputChannelId; + + // + // parameters + // + + private final boolean testReadOnlyBuffer; + + private final boolean testCompressedBuffer; + + @Parameterized.Parameters(name = "testReadOnlyBuffer = {0}, testCompressedBuffer = {1}") + public static Collection testReadOnlyBuffer() { + return Arrays.asList(new Object[][] { + {false, false}, + {true, false}, + {false, true}, + {true, true} + }); + } + + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + this.testReadOnlyBuffer = testReadOnlyBuffer; + this.testCompressedBuffer = testCompressedBuffer; + } + + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new Network
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471013 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java ## @@ -162,6 +168,37 @@ static NettyConfig createConfig(int segmentSize, Configuration config) throws Ex config); } + // - + // Encoding & Decoding + // - + + @SuppressWarnings("unchecked") + static T encodeAndDecode(T msg, EmbeddedChannel channel) { + channel.writeOutbound(msg); + ByteBuf encoded = channel.readOutbound(); + + assertTrue(channel.writeInbound(encoded)); + + return (T) channel.readInbound(); Review comment: Remove the transformation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390471013 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java ## @@ -162,6 +168,37 @@ static NettyConfig createConfig(int segmentSize, Configuration config) throws Ex config); } + // - + // Encoding & Decoding + // - + + @SuppressWarnings("unchecked") + static T encodeAndDecode(T msg, EmbeddedChannel channel) { + channel.writeOutbound(msg); + ByteBuf encoded = channel.readOutbound(); + + assertTrue(channel.writeInbound(encoded)); + + return (T) channel.readInbound(); Review comment: Remove the parameter This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390470497 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from client side to server side. + */ +public class NettyMessageServerSideSerializationTest { + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + @Before + public void setup() { + channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // For outbound messages + new NettyMessage.NettyMessageDecoder()); // For inbound messages + } + + @After + public void tearDown() { + channel.close(); + } + + @Test + public void testPartitionRequest() { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest( + new ResultPartitionID(), + random.nextInt(), + new InputChannelID(), + random.nextInt()); + + NettyMessage.PartitionRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.partitionId, actual.partitionId); + assertEquals(expected.queueIndex, actual.queueIndex); + assertEquals(expected.receiverId, actual.receiverId); + assertEquals(expected.credit, actual.credit); + } + + @Test + public void testTaskEventRequest() { + NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ResultPartitionID(), new InputChannelID()); Review comment: Split the line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390470630 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from client side to server side. + */ +public class NettyMessageServerSideSerializationTest { + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + @Before + public void setup() { + channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // For outbound messages + new NettyMessage.NettyMessageDecoder()); // For inbound messages + } + + @After + public void tearDown() { + channel.close(); + } + + @Test + public void testPartitionRequest() { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest( + new ResultPartitionID(), + random.nextInt(), + new InputChannelID(), + random.nextInt()); + + NettyMessage.PartitionRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.partitionId, actual.partitionId); + assertEquals(expected.queueIndex, actual.queueIndex); + assertEquals(expected.receiverId, actual.receiverId); + assertEquals(expected.credit, actual.credit); + } + + @Test + public void testTaskEventRequest() { + NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ResultPartitionID(), new InputChannelID()); + NettyMessage.TaskEventRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.event, actual.event); + assertEquals(expected.partitionId, actual.partitionId); + assertEquals(expected.receiverId, actual.receiverId); + } + + @Test + public void testCancelPartitionRequest() { + NettyMessage.CancelPartitionRequest expected = new NettyMessage.CancelPartitionRequest(new InputChannelID()); Review comment: Split the line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390470753 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes + * sent from client side to server side. + */ +public class NettyMessageServerSideSerializationTest { + + private final Random random = new Random(); + + private EmbeddedChannel channel; + + @Before + public void setup() { + channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // For outbound messages + new NettyMessage.NettyMessageDecoder()); // For inbound messages + } + + @After + public void tearDown() { + channel.close(); + } + + @Test + public void testPartitionRequest() { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest( + new ResultPartitionID(), + random.nextInt(), + new InputChannelID(), + random.nextInt()); + + NettyMessage.PartitionRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.partitionId, actual.partitionId); + assertEquals(expected.queueIndex, actual.queueIndex); + assertEquals(expected.receiverId, actual.receiverId); + assertEquals(expected.credit, actual.credit); + } + + @Test + public void testTaskEventRequest() { + NettyMessage.TaskEventRequest expected = new NettyMessage.TaskEventRequest(new IntegerTaskEvent(random.nextInt()), new ResultPartitionID(), new InputChannelID()); + NettyMessage.TaskEventRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.event, actual.event); + assertEquals(expected.partitionId, actual.partitionId); + assertEquals(expected.receiverId, actual.receiverId); + } + + @Test + public void testCancelPartitionRequest() { + NettyMessage.CancelPartitionRequest expected = new NettyMessage.CancelPartitionRequest(new InputChannelID()); + NettyMessage.CancelPartitionRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.receiverId, actual.receiverId); + } + + @Test + public void testCloseRequest() { + NettyMessage.CloseRequest expected = new NettyMessage.CloseRequest(); + NettyMessage.CloseRequest actual = encodeAndDecode(expected, channel); + + assertEquals(expected.getClass(), actual.getClass()); + } + + @Test + public void testAddCredit() { + NettyMessage.AddCredit expected = new NettyMessage.AddCredit(random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID()); Review comment: Split the line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With reg
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390470319 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +458,110 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReadBufferResponseBeforeReleasingChannel() throws Exception { + testReadBufferResponseBeforeReleasingOrRemovingChannel(false); + } + + @Test + public void testReadBufferResponseBeforeRemovingChannel() throws Exception { + testReadBufferResponseBeforeReleasingOrRemovingChannel(true); + } + + @Test + public void testReadBufferResponseAfterReleasingChannel() throws Exception { + testReadBufferResponseAfterReleasingAndRemovingChannel(false); + } + + @Test + public void testReadBufferResponseAfterRemovingChannel() throws Exception { + testReadBufferResponseAfterReleasingAndRemovingChannel(true); + } + + private void testReadBufferResponseBeforeReleasingOrRemovingChannel(boolean isRemoved) throws Exception { + int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = new InputChannelBuilder() + .setMemorySegmentProvider(networkBufferPool) + .buildRemoteAndSetToGate(inputGate); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + EmbeddedChannel embeddedChannel = new EmbeddedChannel(handler); + handler.addInputChannel(inputChannel); + + try { + BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(bufferSize), + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + // Release the channel. + inputGate.close(); + if (isRemoved) { + handler.removeInputChannel(inputChannel); + } + + handler.channelRead(null, bufferResponse); + + assertEquals(0, inputChannel.getNumberOfQueuedBuffers()); + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + + embeddedChannel.runScheduledPendingTasks(); + NettyMessage.CancelPartitionRequest cancelPartitionRequest = embeddedChannel.readOutbound(); + assertNotNull(cancelPartitionRequest); + assertEquals(inputChannel.getInputChannelId(), cancelPartitionRequest.receiverId); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + private void testReadBufferResponseAfterReleasingAndRemovingChannel(boolean isRemoved) throws Exception { Review comment: Renamed `ReleasingAndRemoving` into `ReleasingOrRemoving` and extractor the common method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390469208 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import javax.annotation.Nullable; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The decoder for {@link BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The accumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The BufferResponse message that has its message header decoded, but still +* not received all the bytes of the buffer part. +*/ + @Nullable + private BufferResponse bufferResponse; + + /** How many bytes have been received or discarded for the data buffer part. */ + private int decodedDataBufferSize; + + BufferResponseDecoder(NetworkBufferAllocator allocator) { + this.allocator = checkNotNull(allocator); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(MESSAGE_HEADER_LENGTH); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + if (bufferResponse == null) { + extractMessageHeader(data); + } + + if (bufferResponse != null) { + int remainingBufferSize = bufferResponse.bufferSize - decodedDataBufferSize; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + + // For the case of data buffer really exists in BufferResponse now. + if (actualBytesToDecode > 0) { + // For the case of released input channel, the respective data buffer part would be + // discarded from the received buffer. + if (bufferResponse.getBuffer() == null) { + data.readerIndex(data.readerIndex() + actualBytesToDecode); + } else { + bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode); + } + + decodedDataBufferSize += actualBytesToDecode; + } + + if (decodedDataBufferSize == bufferResponse.bufferSize) { + BufferResponse result = bufferResponse; + clearState(); + return DecodingResult.fullMessage(result); + } + } + + return DecodingResult.NOT_FINISHED; + } + + private void extractMessageHeader(ByteBuf data) { Review comment: Modified the method name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390468626 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import java.net.ProtocolException; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; + +/** + * The decoder for messages other than {@link BufferResponse}. + */ +class NonBufferResponseDecoder extends NettyMessageDecoder { + + /** The initial size of the message header accumulation buffer. */ + private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128; + + /** The accumulation buffer of the message header. */ + private ByteBuf messageBuffer; + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageBuffer = ctx.alloc().directBuffer(INITIAL_MESSAGE_HEADER_BUFFER_LENGTH); + } + + @Override + void onNewMessageReceived(int msgId, int messageLength) { + super.onNewMessageReceived(msgId, messageLength); + ensureBufferCapacity(); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + ByteBuf toDecode = ByteBufUtils.accumulate( + messageBuffer, + data, + messageLength, + messageBuffer.readableBytes()); + if (toDecode == null) { + return DecodingResult.NOT_FINISHED; + } + + NettyMessage nettyMessage; + switch (msgId) { + case ErrorResponse.ID: + nettyMessage = ErrorResponse.readFrom(toDecode); + break; + default: + throw new ProtocolException("Received unknown message from producer: " + msgId); + } + + messageBuffer.clear(); + return DecodingResult.fullMessage(nettyMessage); + } + + /** +* Ensures the message header accumulation buffer has enough capacity for +* the current message. +*/ + private void ensureBufferCapacity() { + if (messageBuffer.writerIndex() == 0 && messageBuffer.capacity() < messageLength) { Review comment: Refactor the `clear` into `onNewMessageReceived` and remove the adjustment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390468755 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import javax.annotation.Nullable; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The decoder for {@link BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The accumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The BufferResponse message that has its message header decoded, but still +* not received all the bytes of the buffer part. +*/ + @Nullable + private BufferResponse bufferResponse; + + /** How many bytes have been received or discarded for the data buffer part. */ + private int decodedDataBufferSize; + + BufferResponseDecoder(NetworkBufferAllocator allocator) { + this.allocator = checkNotNull(allocator); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(MESSAGE_HEADER_LENGTH); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + if (bufferResponse == null) { + extractMessageHeader(data); + } + + if (bufferResponse != null) { + int remainingBufferSize = bufferResponse.bufferSize - decodedDataBufferSize; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + + // For the case of data buffer really exists in BufferResponse now. + if (actualBytesToDecode > 0) { + // For the case of released input channel, the respective data buffer part would be + // discarded from the received buffer. + if (bufferResponse.getBuffer() == null) { + data.readerIndex(data.readerIndex() + actualBytesToDecode); + } else { + bufferResponse.getBuffer().asByteBuf().writeBytes(data, actualBytesToDecode); + } + + decodedDataBufferSize += actualBytesToDecode; + } + + if (decodedDataBufferSize == bufferResponse.bufferSize) { + BufferResponse result = bufferResponse; + clearState(); + return DecodingResult.fullMessage(result); + } + } + + return DecodingResult.NOT_FINISHED; + } + + private void extractMessageHeader(ByteBuf data) { + ByteBuf toDecode = ByteBufUtils.accumulate( Review comment: Fixed the variable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390467865 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import java.net.ProtocolException; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; + +/** + * The decoder for messages other than {@link BufferResponse}. + */ +class NonBufferResponseDecoder extends NettyMessageDecoder { + + /** The initial size of the message header accumulation buffer. */ + private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128; + + /** The accumulation buffer of the message header. */ Review comment: Modified the comment since it should hold the whole buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390467299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import java.net.ProtocolException; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; + +/** + * The decoder for messages other than {@link BufferResponse}. + */ +class NonBufferResponseDecoder extends NettyMessageDecoder { + + /** The initial size of the message header accumulation buffer. */ + private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128; + + /** The accumulation buffer of the message header. */ + private ByteBuf messageBuffer; + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageBuffer = ctx.alloc().directBuffer(INITIAL_MESSAGE_HEADER_BUFFER_LENGTH); + } + + @Override + void onNewMessageReceived(int msgId, int messageLength) { + super.onNewMessageReceived(msgId, messageLength); + ensureBufferCapacity(); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + ByteBuf toDecode = ByteBufUtils.accumulate( Review comment: Fixed the variable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r390467225 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegate.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.runtime.io.network.NetworkClientHandler; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.FRAME_HEADER_LENGTH; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.MAGIC_NUMBER; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Decodes messages from the received netty buffers. This decoder assumes the + * messages have the following format: + * +---++ + * | FRAME_HEADER || MESSAGE_HEADER | DATA BUFFER (Optional) | + * +---++ + * + * This decoder decodes the frame header and delegates the following work to the + * corresponding message decoders according to the message type. During this process + * The frame header and message header are only accumulated if they span received + * multiple netty buffers, and the data buffer is copied directly to the buffer + * of corresponding input channel to avoid more copying. + * + * The format of the frame header is + * +--+--++ + * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) | + * +--+--++ + */ +public class NettyMessageClientDecoderDelegate extends ChannelInboundHandlerAdapter { + private final Logger LOG = LoggerFactory.getLogger(NettyMessageClientDecoderDelegate.class); + + /** The decoder for BufferResponse. */ +private final NettyMessageDecoder bufferResponseDecoder; + +/** The decoder for messages other than BufferResponse. */ + private final NettyMessageDecoder nonBufferResponseDecoder; + + /** The accumulation buffer for the frame header. */ + private ByteBuf frameHeaderBuffer; + + /** The decoder for the current message. It is null if we are decoding the frame header. */ + private NettyMessageDecoder currentDecoder; + +NettyMessageClientDecoderDelegate(NetworkClientHandler networkClientHandler) { + this.bufferResponseDecoder = new BufferResponseDecoder( + new NetworkBufferAllocator( + checkNotNull(networkClientHandler))); +this.nonBufferResponseDecoder = new NonBufferResponseDecoder(); +} + +@Override +public void channelActive(ChannelHandlerContext ctx) throws Exception { +bufferResponseDecoder.onChannelActive(ctx); +nonBufferResponseDecoder.onChannelActive(ctx); + + frameHeaderBuffer = ctx.alloc().directBuffer(FRAME_HEADER_LENGTH); + + super.channelActive(ctx); +} + + /** +* Releases resources when the channel is closed. When exceptions are thrown during +* processing received netty buffers, {@link CreditBasedPartitionRequestClientHandler} +* is expected to catch the exception and close the channel and trigger this notification. +* +* @param ctx The context of the channel close notification. +*/ + @Override +public void channelInactive(ChannelHandlerContext ctx) throws Exception { + IOUtils.cleanup(LOG, bufferResponseDecoder, nonBufferResponseDecoder); + frameHeaderBuffer.release(); + + super.channelInactive(ctx); +} + +@
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388749965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388745136 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388742828 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388742647 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); Review comment: Since now the codes are extracted to a common method where normalInputChannel and releaseInputChannel are all created, I think the normalInputChannel might be better there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388742111 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +459,99 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testChannelReleasedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(true); + } + + @Test + public void testChannelReleasedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(true); + } + + private void testChannelReleasedOrRemovedBeforeDecodingBufferResponse(boolean isRemoved) throws Exception { + int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = new InputChannelBuilder() + .setMemorySegmentProvider(networkBufferPool) + .buildRemoteAndSetToGate(inputGate); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(bufferSize), + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + // Release the channel. + inputGate.close(); + if (isRemoved) { + handler.removeInputChannel(inputChannel); + } + + handler.channelRead(null, bufferResponse); + + assertEquals(0, inputChannel.getNumberOfQueuedBuffers()); + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + private void testChannelReleasedOrRemovedBeforeReceivingBufferResponse(boolean isRemoved) throws Exception { + int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = new InputChannelBuilder() + .setMemorySegmentProvider(networkBufferPool) + .buildRemoteAndSetToGate(inputGate); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = spy(new CreditBasedPartitionRequestClientHandler()); + handler.addInputChannel(inputChannel); + + try { + // Release the channel. + inputGate.close(); + if (isRemoved) { + handler.removeInputChannel(inputChannel); + } + + BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(bufferSize), + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + handler.channelRead(null, bufferResponse); + + assertEquals(0, inputChannel.getNumberOfQueuedBuffers()); + assertNull(bufferResponse.getBuffer()); + verify(handler, times(1)).cancelRequestFor(eq(inputChannel.getInputChannelId())); Review comment: Remove the `spy` usage and use `EmbeddedChannel` instead. This is an automated message from the Apache Git Service. To
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741911 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +459,99 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testChannelReleasedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(true); + } + + @Test + public void testChannelReleasedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(true); + } + + private void testChannelReleasedOrRemovedBeforeDecodingBufferResponse(boolean isRemoved) throws Exception { Review comment: Renamed the methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741843 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java ## @@ -249,7 +248,7 @@ private void decodeMsg(Object msg) throws Throwable { NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); - if (inputChannel == null) { + if (inputChannel == null || inputChannel.isReleased()) { Review comment: After some offline discussion, we come to the consistency that the current implementation should be able to cover the above case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741953 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +459,99 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testChannelReleasedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeDecodingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeDecodingBufferResponse(true); + } + + @Test + public void testChannelReleasedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(false); + } + + @Test + public void testChannelRemovedBeforeReceivingBufferResponse() throws Exception { + testChannelReleasedOrRemovedBeforeReceivingBufferResponse(true); + } + + private void testChannelReleasedOrRemovedBeforeDecodingBufferResponse(boolean isRemoved) throws Exception { + int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = new InputChannelBuilder() + .setMemorySegmentProvider(networkBufferPool) + .buildRemoteAndSetToGate(inputGate); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(bufferSize), + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + // Release the channel. + inputGate.close(); + if (isRemoved) { + handler.removeInputChannel(inputChannel); + } + + handler.channelRead(null, bufferResponse); + + assertEquals(0, inputChannel.getNumberOfQueuedBuffers()); + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + private void testChannelReleasedOrRemovedBeforeReceivingBufferResponse(boolean isRemoved) throws Exception { Review comment: Renamed the method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741397 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741599 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { Review comment: Extracted a common method for the four tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741397 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388741245 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388740363 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( Review comment: Closed
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388740363 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( Review comment: Close
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388740026 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderRemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyBufferResponseHeader; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.verifyErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + + private static final int BUFFER_SIZE = 1024; + + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + channel, + false, + false, + false, + normalInputChannel.getInputChannelId(), + null); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + int totalBufferRequired = 3; + + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel normalInputChannel = new BufferProviderRemoteInputChannel(inputGate, totalBufferRequired, BUFFER_SIZE); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(normalInputChannel); + + EmbeddedChannel channel = new EmbeddedChannel(new NettyMessageClientDecoderDelegate(handler)); + + testRepartitionMessagesAndDecode( + chan
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388739692 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/BufferProviderRemoteInputChannel.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Special {@link RemoteInputChannel} implementation that correspond to buffer request. Review comment: The tests are refactored and `BufferProviderRemoteInputChannel` is eliminated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388739325 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + private static final byte ACCUMULATION_BYTE = 0x7d; + private static final byte NON_ACCUMULATION_BYTE = 0x23; + + @Test + public void testAccumulateWithoutCopy() { + int sourceLength = 128; + int sourceReaderIndex = 32; + int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceReaderIndex, expectedAccumulationSize); + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceReaderIndex, src.readerIndex()); + verifyBufferContent(src, sourceReaderIndex, expectedAccumulationSize); + } + + @Test + public void testAccumulateWithCopy() { + int sourceLength = 128; + int firstSourceReaderIndex = 32; + int secondSourceReaderIndex = 0; + int expectedAccumulationSize = 128; + + int firstCopyLength = sourceLength - firstSourceReaderIndex; Review comment: Renamed the variables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388739505 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/BufferProviderRemoteInputChannel.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Special {@link RemoteInputChannel} implementation that correspond to buffer request. + */ +public class BufferProviderRemoteInputChannel extends RemoteInputChannel { + private final int maxNumberOfBuffers; + private final int bufferSize; + + private int allocatedBuffers; + + public BufferProviderRemoteInputChannel( + SingleInputGate inputGate, + int maxNumberOfBuffers, + int bufferSize) { + + super( + inputGate, + 0, + new ResultPartitionID(), + InputChannelBuilder.STUB_CONNECTION_ID, + new LocalConnectionManager(), + 0, + 0, + InputChannelTestUtils.newUnregisteredInputChannelMetrics(), + InputChannelTestUtils.StubMemorySegmentProvider.getInstance()); + + inputGate.setInputChannel(new IntermediateResultPartitionID(), this); + + this.maxNumberOfBuffers = maxNumberOfBuffers; + this.bufferSize = bufferSize; + } + + @Nullable + @Override + public Buffer requestBuffer() { + if (isReleased()) { + return null; + } + + checkState(allocatedBuffers < maxNumberOfBuffers, + String.format("The number of allocated buffers %d have reached the maximum allowed %d.", allocatedBuffers, maxNumberOfBuffers)); Review comment: The tests are refactored and `BufferProviderRemoteInputChannel` is eliminated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388738882 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + private static final byte ACCUMULATION_BYTE = 0x7d; + private static final byte NON_ACCUMULATION_BYTE = 0x23; + + @Test + public void testAccumulateWithoutCopy() { + int sourceLength = 128; + int sourceReaderIndex = 32; + int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceReaderIndex, expectedAccumulationSize); + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceReaderIndex, src.readerIndex()); + verifyBufferContent(src, sourceReaderIndex, expectedAccumulationSize); + } + + @Test + public void testAccumulateWithCopy() { + int sourceLength = 128; + int firstSourceReaderIndex = 32; + int secondSourceReaderIndex = 0; + int expectedAccumulationSize = 128; + + int firstCopyLength = sourceLength - firstSourceReaderIndex; + int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(sourceLength, firstSourceReaderIndex, firstCopyLength); + ByteBuf secondSource = createSourceBuffer(sourceLength, secondSourceReaderIndex, secondCopyLength); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(sourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceReaderIndex + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, expectedAccumulationSize); + } + + /** +* Create a source buffer whose length is size. The content between readerIndex and Review comment: Added the tags. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastru
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r388071627 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ByteBufUtils.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +/** + * Utility routines to process Netty ByteBuf. + */ +public class ByteBufUtils { + + /** +* Cumulates data from the source buffer to the target buffer. +* +* @param cumulationBuf The target buffer. +* @param src The source buffer. +* @param expectedSize The expected length to cumulate. +* +* @return The ByteBuf containing cumulated data or null if not enough data has been cumulated. +*/ + public static ByteBuf cumulate(ByteBuf cumulationBuf, ByteBuf src, int expectedSize) { + // If the cumulation buffer is empty and src has enought bytes, + // user could read from src directly without cumulation. + if (cumulationBuf.readerIndex() == 0 + && cumulationBuf.writerIndex() == 0 + && src.readableBytes() >= expectedSize) { + + return src; + } + + int copyLength = Math.min(src.readableBytes(), expectedSize - cumulationBuf.readableBytes()); + + if (copyLength > 0) { + cumulationBuf.writeBytes(src, copyLength); + } + + if (cumulationBuf.readableBytes() == expectedSize) { + return cumulationBuf; + } + + return null; Review comment: Hi @TisonKun very thanks for review and very sorry for missing the comment since it is folded in the PR page. We also thought of using `Optional` before, however, considering that this method is not a part of public API and it should be performance-sensitive (will be called twice for each buffer), it might be better to keep the `null`. What do you think of that~ ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386799098 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/BufferProviderInputChannelBuilder.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder for special {@link RemoteInputChannel} that correspond to buffer request, allow users to + * set InputChannelId and release state. + */ +public class BufferProviderInputChannelBuilder { + private SingleInputGate inputGate = new SingleInputGateBuilder().build(); + private InputChannelID id = new InputChannelID(); + private int maxNumberOfBuffers = Integer.MAX_VALUE; + private int bufferSize = 32 * 1024; + private boolean isReleased = false; + + public BufferProviderInputChannelBuilder setInputGate(SingleInputGate inputGate) { + this.inputGate = inputGate; + return this; + } + + public BufferProviderInputChannelBuilder setId(InputChannelID id) { + this.id = id; + return this; + } + + public BufferProviderInputChannelBuilder setMaxNumberOfBuffers(int maxNumberOfBuffers) { + this.maxNumberOfBuffers = maxNumberOfBuffers; + return this; + } + + public BufferProviderInputChannelBuilder setBufferSize(int bufferSize) { Review comment: Removed the builder, thus the method is also removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798885 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, false, false); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + // 4 buffers required. + testRepartitionMessagesAndDecode(4, true, false, false); + } + + /** +* Verifies that NettyMessageDecoder works well with buffers sent to a released and removed input channels. +* For such channels, no Buffer is available to receive the data buffer in the message, and the data buffer +* part should be discarded before reading the next message. +*/ + @Test + public void testDownstreamMessageDecodeWithReleasedAndRemovedInputChannel() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, true, true); + } + + //-- + + private void testRepartitionMessagesAndDecode( + int numberOfBuffersInNormalChannel, + boolean hasEmptyBuffer, + boolean hasBufferForReleasedChannel, + boolean hasBufferForRemovedChannel) throws Exception { + + EmbeddedChannel channel = createPartitionRequestClientHandler(numberOfBuffersInNormalChannel); + + try { + List messages = createMessageList(hasEmptyBuffer, hasBufferForReleasedChannel, hasBufferForRemovedChannel); + repartitionMessagesAndVerifyDecoding(channel, messages); + } finally { +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798958 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, false, false); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + // 4 buffers required. + testRepartitionMessagesAndDecode(4, true, false, false); + } + + /** +* Verifies that NettyMessageDecoder works well with buffers sent to a released and removed input channels. +* For such channels, no Buffer is available to receive the data buffer in the message, and the data buffer +* part should be discarded before reading the next message. +*/ + @Test + public void testDownstreamMessageDecodeWithReleasedAndRemovedInputChannel() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, true, true); + } + + //-- + + private void testRepartitionMessagesAndDecode( + int numberOfBuffersInNormalChannel, + boolean hasEmptyBuffer, + boolean hasBufferForReleasedChannel, + boolean hasBufferForRemovedChannel) throws Exception { + + EmbeddedChannel channel = createPartitionRequestClientHandler(numberOfBuffersInNormalChannel); + + try { + List messages = createMessageList(hasEmptyBuffer, hasBufferForReleasedChannel, hasBufferForRemovedChannel); + repartitionMessagesAndVerifyDecoding(channel, messages); + } finally { +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798744 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. Review comment: Changed the number to a local variable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798744 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. Review comment: Changed the number to a local buffer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798696 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, false, false); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + // 4 buffers required. + testRepartitionMessagesAndDecode(4, true, false, false); + } + + /** +* Verifies that NettyMessageDecoder works well with buffers sent to a released and removed input channels. +* For such channels, no Buffer is available to receive the data buffer in the message, and the data buffer +* part should be discarded before reading the next message. +*/ + @Test + public void testDownstreamMessageDecodeWithReleasedAndRemovedInputChannel() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, true, true); Review comment: Split the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798439 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, false, false); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + // 4 buffers required. + testRepartitionMessagesAndDecode(4, true, false, false); + } + + /** +* Verifies that NettyMessageDecoder works well with buffers sent to a released and removed input channels. +* For such channels, no Buffer is available to receive the data buffer in the message, and the data buffer +* part should be discarded before reading the next message. +*/ + @Test + public void testDownstreamMessageDecodeWithReleasedAndRemovedInputChannel() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, true, true); + } + + //-- + + private void testRepartitionMessagesAndDecode( + int numberOfBuffersInNormalChannel, + boolean hasEmptyBuffer, + boolean hasBufferForReleasedChannel, + boolean hasBufferForRemovedChannel) throws Exception { + + EmbeddedChannel channel = createPartitionRequestClientHandler(numberOfBuffersInNormalChannel); + + try { Review comment: `EmbeddedChannel` seems not implemented `AutoClosable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386798377 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferProviderInputChannelBuilder; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID RELEASED_CHANNEL_ID = new InputChannelID(); + + private static final InputChannelID REMOVED_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, false, false); + } + + /** +* Verifies that the client side decoder works well for empty buffers. Empty buffers should not +* consume data buffers of the input channels. +*/ + @Test + public void testDownstreamMessageDecodeWithEmptyBuffers() throws Exception { + // 4 buffers required. + testRepartitionMessagesAndDecode(4, true, false, false); + } + + /** +* Verifies that NettyMessageDecoder works well with buffers sent to a released and removed input channels. +* For such channels, no Buffer is available to receive the data buffer in the message, and the data buffer +* part should be discarded before reading the next message. +*/ + @Test + public void testDownstreamMessageDecodeWithReleasedAndRemovedInputChannel() throws Exception { + // 3 buffers required. + testRepartitionMessagesAndDecode(3, false, true, true); + } + + //-- + + private void testRepartitionMessagesAndDecode( + int numberOfBuffersInNormalChannel, + boolean hasEmptyBuffer, + boolean hasBufferForReleasedChannel, + boolean hasBufferForRemovedChannel) throws Exception { + + EmbeddedChannel channel = createPartitionRequestClientHandler(numberOfBuffersInNormalChannel); + + try { + List messages = createMessageList(hasEmptyBuffer, hasBufferForReleasedChannel, hasBufferForRemovedChannel); + repartitionMessagesAndVerifyDecoding(channel, messages); + } finally { +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386797874 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -446,7 +546,11 @@ private static BufferResponse createBufferResponse( // Deserialize the bytes again. We have to go this way, because we only partly deserialize // the header of the response and wait for a buffer from the buffer pool to copy the payload // data into. Review comment: Fixed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386797841 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +456,68 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReceivedBufferForRemovedChannel() throws Exception { + final int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null, networkBufferPool); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + Buffer buffer = TestBufferFactory.createBuffer(bufferSize); + BufferResponse bufferResponse = createBufferResponse( + buffer, + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + handler.removeInputChannel(inputChannel); + handler.channelRead(null, bufferResponse); + + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + @Test + public void testReceivedBufferForReleasedChannel() throws Exception { + final int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null, networkBufferPool); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + Buffer buffer = TestBufferFactory.createBuffer(bufferSize); + BufferResponse bufferResponse = createBufferResponse( + buffer, + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + inputGate.close(); Review comment: There should not be such tests now, thus new tests are added. The common code is also extracted into utility method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386797617 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +456,68 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReceivedBufferForRemovedChannel() throws Exception { + final int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null, networkBufferPool); Review comment: Changed to using the builder. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386797635 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +456,68 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReceivedBufferForRemovedChannel() throws Exception { + final int bufferSize = 1024; Review comment: Removed the final. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386797662 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +456,68 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReceivedBufferForRemovedChannel() throws Exception { + final int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null, networkBufferPool); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + Buffer buffer = TestBufferFactory.createBuffer(bufferSize); + BufferResponse bufferResponse = createBufferResponse( + buffer, + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + handler.removeInputChannel(inputChannel); + handler.channelRead(null, bufferResponse); + + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + @Test + public void testReceivedBufferForReleasedChannel() throws Exception { + final int bufferSize = 1024; Review comment: Removed the final. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386796864 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -419,6 +456,68 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReceivedBufferForRemovedChannel() throws Exception { + final int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate, null, networkBufferPool); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + try { + Buffer buffer = TestBufferFactory.createBuffer(bufferSize); + BufferResponse bufferResponse = createBufferResponse( + buffer, + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + handler.removeInputChannel(inputChannel); + handler.channelRead(null, bufferResponse); + + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); Review comment: Added the verification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386796818 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -312,9 +333,17 @@ public void testNotifyCreditAvailable() throws Exception { // The buffer response will take one available buffer from input channel, and it will trigger // requesting (backlog + numExclusiveBuffers - numAvailableBuffers) floating buffers final BufferResponse bufferResponse1 = createBufferResponse( - TestBufferFactory.createBuffer(32), 0, inputChannel1.getInputChannelId(), 1); + TestBufferFactory.createBuffer(32), + 0, + inputChannel1.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); final BufferResponse bufferResponse2 = createBufferResponse( - TestBufferFactory.createBuffer(32), 0, inputChannel2.getInputChannelId(), 1); + TestBufferFactory.createBuffer(32), + 0, + inputChannel2.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); Review comment: Changed to using the same allocator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386796843 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -339,7 +368,11 @@ public void testNotifyCreditAvailable() throws Exception { // Trigger notify credits availability via buffer response on the condition of an un-writable channel final BufferResponse bufferResponse3 = createBufferResponse( - TestBufferFactory.createBuffer(32), 1, inputChannel1.getInputChannelId(), 1); + TestBufferFactory.createBuffer(32), + 1, + inputChannel1.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); Review comment: Changed to using the same allocator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386164372 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + NettyChannelAndInputChannelIds context = createPartitionRequestClientHandler(6); + + Supplier messagesSupplier = () -> { + Buffer event = createDataBuffer(32); + event.tagAsEvent(); + + return new NettyMessage[]{ + new NettyMessage.BufferResponse(createDataBuffer(128), 0, context.getNormalChannelId(), 4), + new NettyMessage.BufferResponse(createDataBuffer(256), 1, context.getNormalChannelId(), 3), + new NettyMessage.BufferResponse(event, 2, context.getNormalChannelId(), 4), + new NettyMessage.ErrorResponse(new RuntimeException("test"), context.getNormalChannelId()), + new NettyMessage.BufferResponse(createDataBuffer(56), 3, context.getNormalChannelId(), 4) + }; + }; + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, + (int[] sizes) -> new int[]{ + sizes[0] / 3, + sizes[0] + sizes[1] + sizes[2] / 3, + sizes[0] + sizes[1] + sizes[2] + sizes[3] / 3 * 2, + sizes[0] + sizes[1] + sizes[2] + sizes[3] + sizes[4] / 3 * 2 + }); + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386164335 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + NettyChannelAndInputChannelIds context = createPartitionRequestClientHandler(6); + + Supplier messagesSupplier = () -> { + Buffer event = createDataBuffer(32); + event.tagAsEvent(); + + return new NettyMessage[]{ + new NettyMessage.BufferResponse(createDataBuffer(128), 0, context.getNormalChannelId(), 4), + new NettyMessage.BufferResponse(createDataBuffer(256), 1, context.getNormalChannelId(), 3), + new NettyMessage.BufferResponse(event, 2, context.getNormalChannelId(), 4), + new NettyMessage.ErrorResponse(new RuntimeException("test"), context.getNormalChannelId()), + new NettyMessage.BufferResponse(createDataBuffer(56), 3, context.getNormalChannelId(), 4) + }; + }; + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, + (int[] sizes) -> new int[]{ + sizes[0] / 3, + sizes[0] + sizes[1] + sizes[2] / 3, + sizes[0] + sizes[1] + sizes[2] + sizes[3] / 3 * 2, + sizes[0] + sizes[1] + sizes[2] + sizes[3] + sizes[4] / 3 * 2 + }); + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386164105 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + NettyChannelAndInputChannelIds context = createPartitionRequestClientHandler(6); + + Supplier messagesSupplier = () -> { + Buffer event = createDataBuffer(32); + event.tagAsEvent(); + + return new NettyMessage[]{ + new NettyMessage.BufferResponse(createDataBuffer(128), 0, context.getNormalChannelId(), 4), + new NettyMessage.BufferResponse(createDataBuffer(256), 1, context.getNormalChannelId(), 3), + new NettyMessage.BufferResponse(event, 2, context.getNormalChannelId(), 4), + new NettyMessage.ErrorResponse(new RuntimeException("test"), context.getNormalChannelId()), + new NettyMessage.BufferResponse(createDataBuffer(56), 3, context.getNormalChannelId(), 4) + }; + }; + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, + (int[] sizes) -> new int[]{ + sizes[0] / 3, + sizes[0] + sizes[1] + sizes[2] / 3, + sizes[0] + sizes[1] + sizes[2] + sizes[3] / 3 * 2, + sizes[0] + sizes[1] + sizes[2] + sizes[3] + sizes[4] / 3 * 2 + }); + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386164079 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + NettyChannelAndInputChannelIds context = createPartitionRequestClientHandler(6); + + Supplier messagesSupplier = () -> { + Buffer event = createDataBuffer(32); + event.tagAsEvent(); + + return new NettyMessage[]{ + new NettyMessage.BufferResponse(createDataBuffer(128), 0, context.getNormalChannelId(), 4), + new NettyMessage.BufferResponse(createDataBuffer(256), 1, context.getNormalChannelId(), 3), + new NettyMessage.BufferResponse(event, 2, context.getNormalChannelId(), 4), + new NettyMessage.ErrorResponse(new RuntimeException("test"), context.getNormalChannelId()), + new NettyMessage.BufferResponse(createDataBuffer(56), 3, context.getNormalChannelId(), 4) + }; + }; + + repartitionMessagesAndVerifyDecoding( + context, + messagesSupplier, + (int[] sizes) -> new int[]{ Review comment: Changed to logic to use a fixed partition method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386163976 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.PartitionRequestClient; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + NettyChannelAndInputChannelIds context = createPartitionRequestClientHandler(6); + + Supplier messagesSupplier = () -> { Review comment: Added a new function that returns the list of messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386163779 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); + verifyBufferContent(accumulated, firstCopyLength, secondCopyLength, secondSourceStartPosition); + } + + private ByteBuf createSourceBuffer(int size, int readerIndex) { + ByteBuf buf = Unpooled.buffer(size); + for (int i = 0; i < size; ++i) { + buf.writeByte((byte) i); + } + + buf.readerIndex(readerIndex); + +
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386162539 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); Review comment: The previous checking is split into two parts since the content is not the same byte value. According to the following comments, the content is changed to the same byte value, thus the two checking is also merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386162128 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; Review comment: Unified the source length. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386162093 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; Review comment: Removed `final` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386162011 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages Review comment: Fixed the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386162037 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; Review comment: Fixed the variable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161733 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) Review comment: The `input channel` is changed into another test input channel, thus the problem does not exist now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161786 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); Review comment: Fixed according to the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161763 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); Review comment: The input channel is changed into another test input channel, thus the problem does not exist now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161744 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), Review comment: The input channel is changed into another test input channel, thus the problem does not exist now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161564 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean testReadOnlyBuffer, boolean testComp } NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse( - testBuffer, random.nextInt(), new InputChannelID(), random.nextInt()); - NettyMessage.BufferResponse actual = encodeAndDecode(expected); + testBuffer, random.nextInt(), inputChannel.getInputChannelId(), random.nextInt()); + NettyMessage.BufferResponse actual = encodeAndDecode(expected, channel); // Netty 4.1 is not copying the messages, but retaining slices of them. BufferResponse actual is in this case Review comment: Removed the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386161544 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( Review comment: Added `close` to the channel. Moved the channel creating into `setup` method and add the `close` in `tearDown`. This should not change the tests since each test is running with a new Test object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386120641 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages + new NettyMessage.NettyMessageDecoder()); // inbound messages + + private final Random random = new Random(); + + @Test + public void testEncodeDecode() { + { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new InputChannelID(), random.nextInt()); Review comment: Split the test into distinct tests, and modified the format problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386120621 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); + NettyMessageClientDecoderDelegate nettyMessageClientDecoderDelegate + = new NettyMessageClientDecoderDelegate(networkClientHandler); + + return new ChannelHandler[] { + messageEncoder, + nettyMessageClientDecoderDelegate, Review comment: Fixed according to the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386120630 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages Review comment: Modified the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386120605 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( + buffer, + sequenceNumber, + receivingChannel.getInputChannelId(), + backlog); ByteBuf serialized = resp.write(UnpooledByteBufAllocator.DEFAULT); // Skip general header bytes serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH); + // Deserialize the bytes again. We have to go this way, because we only partly deserialize // the header of the response and wait for a buffer from the buffer pool to copy the payload // data into. - BufferResponse deserialized = BufferResponse.readFrom(serialized); + NetworkBufferAllocator allocator = new NetworkBufferAllocator(clientHandler); + BufferResponse deserialized = BufferResponse.readFrom(serialized, allocator); + + if (deserialized.getBuffer() != null) { Review comment: Currently the content is not used, however, I think we might keep the copying to keep consistent with previous behaviors, and keeping the deserialized buffer the same to the given buffer seems to be more consistent with the intuition when viewing this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386115744 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + Review comment: Removed the empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386115708 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + Review comment: Removed the empty line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386115653 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); + verifyBufferContent(accumulated, firstCopyLength, secondCopyLength, secondSourceStartPosition); + } + + private ByteBuf createSourceBuffer(int size, int readerIndex) { + ByteBuf buf = Unpooled.buffer(size); + for (int i = 0; i < size; ++i) { + buf.writeByte((byte) i); Review comment: Removed the type cast. ---
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386105729 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); + NettyMessageClientDecoderDelegate nettyMessageClientDecoderDelegate + = new NettyMessageClientDecoderDelegate(networkClientHandler); + + return new ChannelHandler[] { + messageEncoder, Review comment: Fixed the indentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386105709 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( + buffer, + sequenceNumber, + receivingChannel.getInputChannelId(), + backlog); ByteBuf serialized = resp.write(UnpooledByteBufAllocator.DEFAULT); // Skip general header bytes serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH); + // Deserialize the bytes again. We have to go this way, because we only partly deserialize // the header of the response and wait for a buffer from the buffer pool to copy the payload // data into. - BufferResponse deserialized = BufferResponse.readFrom(serialized); + NetworkBufferAllocator allocator = new NetworkBufferAllocator(clientHandler); Review comment: Changed the parameter to allocator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386105720 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); Review comment: Modified the type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386105654 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( Review comment: Reverted the change to only passing the `receivingChannelId` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r386105137 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean testReadOnlyBuffer, boolean testComp } NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse( - testBuffer, random.nextInt(), new InputChannelID(), random.nextInt()); - NettyMessage.BufferResponse actual = encodeAndDecode(expected); + testBuffer, random.nextInt(), inputChannel.getInputChannelId(), random.nextInt()); + NettyMessage.BufferResponse actual = encodeAndDecode(expected, channel); // Netty 4.1 is not copying the messages, but retaining slices of them. BufferResponse actual is in this case // holding a reference to the buffer. Buffer will be recycled only once "actual" will be released. - assertFalse(buffer.isRecycled()); - assertFalse(testBuffer.isRecycled()); + assertTrue(buffer.isRecycled()); Review comment: This is because previously the decoder simply create a readonly slice of the received data and transfer it to the client handler, and the client handler is responsible for copying the data into the `NetworkBuffer`. However, with the change the decoder will apply a new `NetworkBuffer` and copy the received data into `NetworkBuffer` directly. Another point is that in this case, we always write a complete message into `EmbeddedChannel` and the channel will not repartition the message. Thus during the encoding and decoding, no accumulation will happen. This is also not always true for real case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385456575 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests the client side message decoder. + */ +public class NettyMessageClientDecoderDelegateTest { + private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1); + + private static final InputChannelID NORMAL_INPUT_CHANNEL_ID = new InputChannelID(); + private static final InputChannelID RELEASED_INPUT_CHANNEL_ID = new InputChannelID(); + + /** +* Verifies that the client side decoder works well for unreleased input channels. +*/ + @Test + public void testDownstreamMessageDecode() throws Exception { + // 6 buffers required for running 2 rounds and 3 buffers each round. + EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessageClientDecoderDelegate(createPartitionRequestClientHandler(6))); + Buffer event = createDataBuffer(32); + event.tagAsEvent(); + + NettyMessage[] messages = new NettyMessage[]{ + new NettyMessage.BufferResponse(createDataBuffer(128), 0, NORMAL_INPUT_CHANNEL_ID, 4), + new NettyMessage.BufferResponse(createDataBuffer(256), 1, NORMAL_INPUT_CHANNEL_ID, 3), + new NettyMessage.BufferResponse(event, 2, NORMAL_INPUT_CHANNEL_ID, 4), + new NettyMessage.ErrorResponse(new RuntimeException("test"), NORMAL_INPUT_CHANNEL_ID), + new NettyMessage.BufferResponse(createDataBuffer(56), 3, NORMAL_INPUT_CHANNEL_ID, 4) + }; + + ByteBuf[] serializedBuffers = null; + ByteBuf mergedBuffer = null; + + try { + serializedBuffers = serializeMessages(messages); + int[] sizes = getBufferSizes(serializedBuffers); + mergedBuffer = mergeBuffers(serializedBuffers); + + ByteBuf[] splitBuffers = partitionBuffer(merge
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385451121 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoder.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +/** + * Base class of decoders for specified netty messages. + */ +abstract class NettyMessageDecoder implements AutoCloseable { + + /** ID of the message under decoding. */ + protected int msgId; + + /** Length of the message under decoding. */ + protected int messageLength; + + /** +* The result of decoding one netty buffer. +*/ + static class DecodingResult { + final static DecodingResult NOT_FINISHED = new DecodingResult(false, null); + + static DecodingResult fullMessage(NettyMessage message) { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385451186 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessageDecoder.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +/** + * Base class of decoders for specified netty messages. + */ +abstract class NettyMessageDecoder implements AutoCloseable { + + /** ID of the message under decoding. */ + protected int msgId; + + /** Length of the message under decoding. */ + protected int messageLength; + + /** +* The result of decoding one netty buffer. +*/ + static class DecodingResult { + final static DecodingResult NOT_FINISHED = new DecodingResult(false, null); + + static DecodingResult fullMessage(NettyMessage message) { + return new DecodingResult(true, message); + } + + final boolean finished; + + final NettyMessage message; + + private DecodingResult(boolean finished, NettyMessage message) { + this.finished = finished; + this.message = message; + } + } + + /** +* Notifies that the underlying channel become active. +* +* @param ctx The context for the callback. +*/ + abstract void onChannelActive(ChannelHandlerContext ctx); + + /** +* Notifies that a new message is to be decoded. +* +* @param msgId The type of the message to be decoded. +* @param messageLength The length of the message to be decoded. +*/ + void onNewMessageReceived(int msgId, int messageLength) { + this.msgId = msgId; + this.messageLength = messageLength; + } + + /** +* Notifies that more data is received for the decoding message. Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r384854587 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The decoder for {@link BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The accumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The BufferResponse message that has its message header decoded, but still +* not received all the bytes of the buffer part. +*/ + private BufferResponse currentResponse; + + /** How many bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + BufferResponseDecoder(NetworkBufferAllocator allocator) { + this.allocator = checkNotNull(allocator); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(MESSAGE_HEADER_LENGTH); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + extractMessageHeader(data); + } + + if (currentResponse != null) { + int remainingBufferSize = currentResponse.bufferSize - decodedBytesOfBuffer; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + if (actualBytesToDecode > 0) { + if (currentResponse.getBuffer() == null) { Review comment: Added the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r384854523 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The decoder for {@link BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The accumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The BufferResponse message that has its message header decoded, but still +* not received all the bytes of the buffer part. +*/ + private BufferResponse currentResponse; + + /** How many bytes have been received or discarded for the buffer part. */ + private int decodedBytesOfBuffer; + + BufferResponseDecoder(NetworkBufferAllocator allocator) { + this.allocator = checkNotNull(allocator); + } + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageHeaderBuffer = ctx.alloc().directBuffer(MESSAGE_HEADER_LENGTH); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + if (currentResponse == null) { + extractMessageHeader(data); + } + + if (currentResponse != null) { + int remainingBufferSize = currentResponse.bufferSize - decodedBytesOfBuffer; + int actualBytesToDecode = Math.min(data.readableBytes(), remainingBufferSize); + if (actualBytesToDecode > 0) { Review comment: Added the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r384853085 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/BufferResponseDecoder.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse.MESSAGE_HEADER_LENGTH; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The decoder for {@link BufferResponse}. + */ +class BufferResponseDecoder extends NettyMessageDecoder { + + /** The Buffer allocator. */ + private final NetworkBufferAllocator allocator; + + /** The accumulation buffer of message header. */ + private ByteBuf messageHeaderBuffer; + + /** +* The BufferResponse message that has its message header decoded, but still +* not received all the bytes of the buffer part. +*/ + private BufferResponse currentResponse; Review comment: Added the annotation and renamed to `bufferResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r384852560 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NonBufferResponseDecoder.java ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; + +import java.net.ProtocolException; + +import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; +import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; + +/** + * The decoder for messages other than {@link BufferResponse}. + */ +class NonBufferResponseDecoder extends NettyMessageDecoder { + + /** The initial size of the message header accumulation buffer. */ + private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128; + + /** The accumulation buffer of the message header. */ + private ByteBuf messageBuffer; + + @Override + public void onChannelActive(ChannelHandlerContext ctx) { + messageBuffer = ctx.alloc().directBuffer(INITIAL_MESSAGE_HEADER_BUFFER_LENGTH); + } + + @Override + public DecodingResult onChannelRead(ByteBuf data) throws Exception { + ensureBufferCapacity(); + + ByteBuf toDecode = ByteBufUtils.accumulate( + messageBuffer, + data, + messageLength, + messageBuffer.readableBytes()); + if (toDecode == null) { + return DecodingResult.NOT_FINISHED; + } + + NettyMessage nettyMessage; + switch (msgId) { + case ErrorResponse.ID: + nettyMessage = ErrorResponse.readFrom(toDecode); + break; + default: + throw new ProtocolException("Received unknown message from producer: " + msgId); + } + + messageBuffer.clear(); + return DecodingResult.fullMessage(nettyMessage); + } + + /** +* Ensures the message header accumulation buffer has enough capacity for +* the current message. +*/ + private void ensureBufferCapacity() { Review comment: Have fixed accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r384852044 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java ## @@ -329,18 +334,12 @@ void releaseBuffer() { @Override ByteBuf write(ByteBufAllocator allocator) throws IOException { - // receiver ID (16), sequence number (4), backlog (4), isBuffer (1), isCompressed (1), buffer size (4) - final int messageHeaderLength = 16 + 4 + 4 + 1 + 1 + 4; - ByteBuf headerBuf = null; try { - if (buffer instanceof Buffer) { - // in order to forward the buffer to netty, it needs an allocator set Review comment: Added the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services