This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3ea19203b8c12845ad85de046e18434aa9b364e
Author: Yuxin Tan <tanyuxinw...@gmail.com>
AuthorDate: Wed May 31 15:30:05 2023 +0800

    [hotfix][network] Fix the writing subpartition id for tiered storage 
producer client
---
 .../hybrid/tiered/storage/TieredStorageProducerClient.java         | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 9a87ec91ff0..299c71a7bfc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -80,7 +80,12 @@ public class TieredStorageProducerClient {
 
         if (isBroadcast && !isBroadcastOnly) {
             for (int i = 0; i < numSubpartitions; ++i) {
-                bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+                // As the tiered storage subpartition ID is created only for 
broadcast records,
+                // which are fewer than normal records, the performance impact 
of generating new
+                // TieredStorageSubpartitionId objects is expected to be 
manageable. If the
+                // performance is significantly affected, this logic will be 
optimized accordingly.
+                bufferAccumulator.receive(
+                        record.duplicate(), new 
TieredStorageSubpartitionId(i), dataType);
             }
         } else {
             bufferAccumulator.receive(record, subpartitionId, dataType);

Reply via email to