This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6db32a3374a [FLINK-33502][network] Prevent 
DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing 
NPE when the task is released
6db32a3374a is described below

commit 6db32a3374a3fdb34c47039c6240c3845a3e1e30
Author: Wencong Liu <liuwencle...@163.com>
AuthorDate: Mon Dec 4 11:09:03 2023 +0800

    [FLINK-33502][network] Prevent 
DiskTierProducerAgent#getSegmentIdByIndexOfFirstBufferInSegment from throwing 
NPE when the task is released
---
 .../partition/hybrid/tiered/tier/disk/DiskIOScheduler.java | 14 +++++++-------
 .../hybrid/tiered/tier/disk/DiskTierProducerAgent.java     |  8 +++++---
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
index 5594f7bcb57..addb53e3955 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
@@ -95,10 +95,11 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
     private final Duration bufferRequestTimeout;
 
     /**
-     * Retrieve the segment id if the buffer index represents the first 
buffer. The first integer is
-     * the id of subpartition, and the second integer is buffer index and the 
value is segment id.
+     * Get the segment id if the buffer index represents the first buffer in a 
segment. The first
+     * integer is the id of subpartition, and the second integer is buffer 
index and the value is
+     * segment id.
      */
-    private final BiFunction<Integer, Integer, Integer> 
firstBufferIndexInSegmentRetriever;
+    private final BiFunction<Integer, Integer, Integer> segmentIdGetter;
 
     private final PartitionFileReader partitionFileReader;
 
@@ -121,14 +122,14 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
             ScheduledExecutorService ioExecutor,
             int maxRequestedBuffers,
             Duration bufferRequestTimeout,
-            BiFunction<Integer, Integer, Integer> 
firstBufferIndexInSegmentRetriever,
+            BiFunction<Integer, Integer, Integer> segmentIdGetter,
             PartitionFileReader partitionFileReader) {
         this.partitionId = partitionId;
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         this.maxRequestedBuffers = maxRequestedBuffers;
         this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
-        this.firstBufferIndexInSegmentRetriever = 
firstBufferIndexInSegmentRetriever;
+        this.segmentIdGetter = segmentIdGetter;
         this.partitionFileReader = partitionFileReader;
         bufferPool.registerRequester(this);
     }
@@ -485,8 +486,7 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
 
         private void updateSegmentId() {
             Integer segmentId =
-                    firstBufferIndexInSegmentRetriever.apply(
-                            subpartitionId.getSubpartitionId(), 
nextBufferIndex);
+                    segmentIdGetter.apply(subpartitionId.getSubpartitionId(), 
nextBufferIndex);
             if (segmentId != null) {
                 nextSegmentId = segmentId;
                 
writeToNettyConnectionWriter(NettyPayload.newSegment(segmentId));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index 041374dd7d3..5bf029c0e66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -133,7 +133,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
                         ioExecutor,
                         maxRequestedBuffers,
                         bufferRequestTimeout,
-                        this::retrieveFirstBufferIndexInSegment,
+                        this::firstBufferIndexToSegmentId,
                         partitionFileReader);
 
         nettyService.registerProducer(partitionId, this);
@@ -216,14 +216,16 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
 
     private void releaseResources() {
         if (!isReleased) {
-            firstBufferIndexInSegment.clear();
+            for (Map<Integer, Integer> subFirstBufferIndexInSegment : 
firstBufferIndexInSegment) {
+                subFirstBufferIndexInSegment.clear();
+            }
             diskCacheManager.release();
             diskIOScheduler.release();
             isReleased = true;
         }
     }
 
-    private Integer retrieveFirstBufferIndexInSegment(int subpartitionId, int 
bufferIndex) {
+    private Integer firstBufferIndexToSegmentId(int subpartitionId, int 
bufferIndex) {
         return firstBufferIndexInSegment.size() > subpartitionId
                 ? 
firstBufferIndexInSegment.get(subpartitionId).get(bufferIndex)
                 : null;

Reply via email to