Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4759#discussion_r148944363 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -293,6 +293,27 @@ public void add(Buffer buffer, int subpartitionIndex) throws IOException { } /** + * Writes the given buffer to all available target channels. + * + * <p>The buffer is taken over and used for each of the channels. It will be recycled afterwards. + * + * @param buffer the buffer to write + */ + public void addToAllChannels(Buffer buffer) throws IOException { + try { + for (int targetChannel = 0; targetChannel < subpartitions.length; targetChannel++) { + // retain the buffer so that it can be recycled by each channel of targetPartition + buffer.retain(); --- End diff -- ok, why not...
---