This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f3ea19203b8c12845ad85de046e18434aa9b364e Author: Yuxin Tan <tanyuxinw...@gmail.com> AuthorDate: Wed May 31 15:30:05 2023 +0800 [hotfix][network] Fix the writing subpartition id for tiered storage producer client --- .../hybrid/tiered/storage/TieredStorageProducerClient.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java index 9a87ec91ff0..299c71a7bfc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java @@ -80,7 +80,12 @@ public class TieredStorageProducerClient { if (isBroadcast && !isBroadcastOnly) { for (int i = 0; i < numSubpartitions; ++i) { - bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); + // As the tiered storage subpartition ID is created only for broadcast records, + // which are fewer than normal records, the performance impact of generating new + // TieredStorageSubpartitionId objects is expected to be manageable. If the + // performance is significantly affected, this logic will be optimized accordingly. + bufferAccumulator.receive( + record.duplicate(), new TieredStorageSubpartitionId(i), dataType); } } else { bufferAccumulator.receive(record, subpartitionId, dataType);