This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e77dc91a59b81c9727fe2d7b3f29dbdd290266a3
Author: Weijie Guo <res...@163.com>
AuthorDate: Fri Sep 8 16:14:00 2023 +0800

    [FLINK-33076][network] Reduce serialization overhead of broadcast emit from 
ChannelSelectorRecordWriter.
---
 .../io/network/api/writer/ChannelSelectorRecordWriter.java     |  2 +-
 .../flink/runtime/io/network/api/writer/RecordWriter.java      | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 5b756b693b3..e9e19354ca4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -65,7 +65,7 @@ public final class ChannelSelectorRecordWriter<T extends 
IOReadableWritable>
         ByteBuffer serializedRecord = serializeRecord(serializer, record);
         for (int channelIndex = 0; channelIndex < numberOfChannels; 
channelIndex++) {
             serializedRecord.rewind();
-            emit(record, channelIndex);
+            emit(serializedRecord, channelIndex);
         }
 
         if (flushAlways) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 57f0a800e37..e9760bdadd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -111,6 +111,16 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
         }
     }
 
+    protected void emit(ByteBuffer record, int targetSubpartition) throws 
IOException {
+        checkErroneous();
+
+        targetPartition.emitRecord(record, targetSubpartition);
+
+        if (flushAlways) {
+            targetPartition.flush(targetSubpartition);
+        }
+    }
+
     public void broadcastEvent(AbstractEvent event) throws IOException {
         broadcastEvent(event, false);
     }

Reply via email to