Repository: hadoop
Updated Branches:
  refs/heads/trunk 1da8d4190 -> 274eee328


MAPREDUCE-7095. Race conditions in closing FadvisedChunkedFile. (Miklos Szegedi 
via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/274eee32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/274eee32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/274eee32

Branch: refs/heads/trunk
Commit: 274eee32841082a734c57f9e383e5e2f02f97f78
Parents: 1da8d41
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu May 10 10:42:26 2018 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Thu May 10 10:46:55 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/io/ReadaheadPool.java     |  6 ++-
 .../hadoop/mapred/FadvisedChunkedFile.java      | 57 +++++++++++++-------
 .../hadoop/mapred/TestFadvisedChunkedFile.java  | 55 +++++++++++++++++++
 3 files changed, 98 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/274eee32/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
index 2e65f12..7cd7f98 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
@@ -205,8 +205,10 @@ public class ReadaheadPool {
       // It's also possible that we'll end up requesting readahead on some
       // other FD, which may be wasted work, but won't cause a problem.
       try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-            fd, off, len, POSIX_FADV_WILLNEED);
+        if (fd.valid()) {
+          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+              identifier, fd, off, len, POSIX_FADV_WILLNEED);
+        }
       } catch (IOException ioe) {
         if (canceled) {
           // no big deal - the reader canceled the request and closed

http://git-wip-us.apache.org/repos/asf/hadoop/blob/274eee32/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
index 6a4e3b4..e9f0f34 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
@@ -22,6 +22,7 @@ import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -37,13 +38,14 @@ public class FadvisedChunkedFile extends ChunkedFile {
   private static final Logger LOG =
       LoggerFactory.getLogger(FadvisedChunkedFile.class);
 
+  private final Object closeLock = new Object();
   private final boolean manageOsCache;
   private final int readaheadLength;
   private final ReadaheadPool readaheadPool;
   private final FileDescriptor fd;
   private final String identifier;
 
-  private ReadaheadRequest readaheadRequest;
+  private volatile ReadaheadRequest readaheadRequest;
 
   public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
       int chunkSize, boolean manageOsCache, int readaheadLength,
@@ -56,31 +58,50 @@ public class FadvisedChunkedFile extends ChunkedFile {
     this.identifier = identifier;
   }
 
+  @VisibleForTesting
+  FileDescriptor getFd() {
+    return fd;
+  }
+
   @Override
   public Object nextChunk() throws Exception {
-    if (manageOsCache && readaheadPool != null) {
-      readaheadRequest = readaheadPool
-          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
-              getEndOffset(), readaheadRequest);
+    synchronized (closeLock) {
+      if (fd.valid()) {
+        if (manageOsCache && readaheadPool != null) {
+          readaheadRequest = readaheadPool
+              .readaheadStream(
+                  identifier, fd, getCurrentOffset(), readaheadLength,
+                  getEndOffset(), readaheadRequest);
+        }
+        return super.nextChunk();
+      } else {
+        return null;
+      }
     }
-    return super.nextChunk();
   }
 
   @Override
   public void close() throws Exception {
-    if (readaheadRequest != null) {
-      readaheadRequest.cancel();
-    }
-    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
-      try {
-        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
-            fd,
-            getStartOffset(), getEndOffset() - getStartOffset(),
-            POSIX_FADV_DONTNEED);
-      } catch (Throwable t) {
-        LOG.warn("Failed to manage OS cache for " + identifier, t);
+    synchronized (closeLock) {
+      if (readaheadRequest != null) {
+        readaheadRequest.cancel();
+        readaheadRequest = null;
+      }
+      if (fd.valid() &&
+          manageOsCache && getEndOffset() - getStartOffset() > 0) {
+        try {
+          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+              identifier,
+              fd,
+              getStartOffset(), getEndOffset() - getStartOffset(),
+              POSIX_FADV_DONTNEED);
+        } catch (Throwable t) {
+          LOG.warn("Failed to manage OS cache for " + identifier +
+              " fd " + fd.toString(), t);
+        }
       }
+      // fd becomes invalid upon closing
+      super.close();
     }
-    super.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/274eee32/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java
new file mode 100644
index 0000000..b6d0fd2
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestFadvisedChunkedFile.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for FadvisedChunkedFile.
+ */
+public class TestFadvisedChunkedFile {
+
+  @Test
+  public void testDoubleClose() throws Exception {
+    File absoluteFile = new File("target",
+        TestFadvisedChunkedFile.class.getSimpleName()).getAbsoluteFile();
+    absoluteFile.deleteOnExit();
+    try {
+      try (RandomAccessFile f = new RandomAccessFile(
+          absoluteFile.getAbsolutePath(), "rw")) {
+        FadvisedChunkedFile af = new FadvisedChunkedFile(
+            f, 0, 5, 2, true,
+            10, null, "foo");
+
+        assertTrue("fd not valid", f.getFD().valid());
+        af.close();
+        assertFalse("fd still valid", f.getFD().valid());
+        af.close();
+        assertFalse("fd still valid", f.getFD().valid());
+      }
+    } finally {
+      absoluteFile.delete();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to