This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c15987ee38 MAPREDUCE-7441. Fix race condition in closing 
FadvisedFileRegion. Contributed by Benjamin Teke
1c15987ee38 is described below

commit 1c15987ee38b96cf2ef4ff383f3f86e082c50fab
Author: Szilard Nemeth <snem...@apache.org>
AuthorDate: Fri Jun 23 14:40:03 2023 -0400

    MAPREDUCE-7441. Fix race condition in closing FadvisedFileRegion. 
Contributed by Benjamin Teke
---
 .../apache/hadoop/mapred/FadvisedFileRegion.java   | 102 ++++++++++++---------
 1 file changed, 58 insertions(+), 44 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
index 9290a282e39..184b58e6c76 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
@@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
   private static final Logger LOG =
       LoggerFactory.getLogger(FadvisedFileRegion.class);
 
+  private final Object closeLock = new Object();
   private final boolean manageOsCache;
   private final int readaheadLength;
   private final ReadaheadPool readaheadPool;
@@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
   private final int shuffleBufferSize;
   private final boolean shuffleTransferToAllowed;
   private final FileChannel fileChannel;
-  
-  private ReadaheadRequest readaheadRequest;
+
+  private volatile ReadaheadRequest readaheadRequest;
 
   public FadvisedFileRegion(RandomAccessFile file, long position, long count,
       boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
-      String identifier, int shuffleBufferSize, 
+      String identifier, int shuffleBufferSize,
       boolean shuffleTransferToAllowed) throws IOException {
     super(file.getChannel(), position, count);
     this.manageOsCache = manageOsCache;
@@ -73,97 +74,110 @@ public class FadvisedFileRegion extends DefaultFileRegion {
 
   @Override
   public long transferTo(WritableByteChannel target, long position)
-      throws IOException {
-    if (readaheadPool != null && readaheadLength > 0) {
-      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
-          position() + position, readaheadLength,
-          position() + count(), readaheadRequest);
+          throws IOException {
+    synchronized (closeLock) {
+      if (fd.valid()) {
+        if (readaheadPool != null && readaheadLength > 0) {
+          readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+                  position() + position, readaheadLength,
+                  position() + count(), readaheadRequest);
+        }
+
+        if(this.shuffleTransferToAllowed) {
+          return super.transferTo(target, position);
+        } else {
+          return customShuffleTransfer(target, position);
+        }
+      } else {
+        return 0L;
+      }
     }
-    
-    if(this.shuffleTransferToAllowed) {
-      return super.transferTo(target, position);
-    } else {
-      return customShuffleTransfer(target, position);
-    } 
+
   }
 
   /**
-   * This method transfers data using local buffer. It transfers data from 
-   * a disk to a local buffer in memory, and then it transfers data from the 
+   * This method transfers data using local buffer. It transfers data from
+   * a disk to a local buffer in memory, and then it transfers data from the
    * buffer to the target. This is used only if transferTo is disallowed in
-   * the configuration file. super.TransferTo does not perform well on Windows 
-   * due to a small IO request generated. customShuffleTransfer can control 
-   * the size of the IO requests by changing the size of the intermediate 
+   * the configuration file. super.TransferTo does not perform well on Windows
+   * due to a small IO request generated. customShuffleTransfer can control
+   * the size of the IO requests by changing the size of the intermediate
    * buffer.
    */
   @VisibleForTesting
   long customShuffleTransfer(WritableByteChannel target, long position)
-      throws IOException {
+          throws IOException {
     long actualCount = this.count - position;
     if (actualCount < 0 || position < 0) {
       throw new IllegalArgumentException(
-          "position out of range: " + position +
-          " (expected: 0 - " + (this.count - 1) + ')');
+              "position out of range: " + position +
+                      " (expected: 0 - " + (this.count - 1) + ')');
     }
     if (actualCount == 0) {
       return 0L;
     }
-    
+
     long trans = actualCount;
     int readSize;
     ByteBuffer byteBuffer = ByteBuffer.allocate(
-        Math.min(
-            this.shuffleBufferSize,
-            trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
-    
+            Math.min(
+                    this.shuffleBufferSize,
+                    trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) 
trans));
+
     while(trans > 0L &&
-        (readSize = fileChannel.read(byteBuffer, this.position+position)) > 0) 
{
+            (readSize = fileChannel.read(byteBuffer, this.position+position)) 
> 0) {
       //adjust counters and buffer limit
       if(readSize < trans) {
         trans -= readSize;
         position += readSize;
         byteBuffer.flip();
       } else {
-        //We can read more than we need if the actualCount is not multiple 
+        //We can read more than we need if the actualCount is not multiple
         //of the byteBuffer size and file is big enough. In that case we cannot
         //use flip method but we need to set buffer limit manually to trans.
         byteBuffer.limit((int)trans);
         byteBuffer.position(0);
-        position += trans; 
+        position += trans;
         trans = 0;
       }
-      
+
       //write data to the target
       while(byteBuffer.hasRemaining()) {
         target.write(byteBuffer);
       }
-      
+
       byteBuffer.clear();
     }
-    
+
     return actualCount - trans;
   }
 
-  
+
   @Override
   protected void deallocate() {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
+    synchronized (closeLock) {
+      if (readaheadRequest != null) {
+        readaheadRequest.cancel();
+        readaheadRequest = null;
+      }
+      super.deallocate();
     }
-    super.deallocate();
   }
-  
+
   /**
    * Call when the transfer completes successfully so we can advise the OS that
    * we don't need the region to be cached anymore.
    */
   public void transferSuccessful() {
-    if (manageOsCache && count() > 0) {
-      try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-            fd, position(), count(), POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
+    synchronized (closeLock) {
+      if (fd.valid() && manageOsCache && count() > 0) {
+        try {
+          
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+                  fd, position(), count(), POSIX_FADV_DONTNEED);
+        } catch (Throwable t) {
+          LOG.warn("Failed to manage OS cache for " + identifier +
+                  " fd " + fd, t);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to