Repository: hbase
Updated Branches:
  refs/heads/fixRefresh [created] 9c046c091


HBASE-20204 Add locking to RefreshFileConnections in BucketCache

This is a follow-up to HBASE-20141 where Anoop suggested adding locking
for refreshing channels.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9c046c09
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9c046c09
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9c046c09

Branch: refs/heads/fixRefresh
Commit: 9c046c091c9b1ca6eb50d8b3e945c24a841dd4f1
Parents: 4f2dfd3
Author: Zach York <zy...@amazon.com>
Authored: Wed Mar 14 15:38:22 2018 -0700
Committer: Zach York <zy...@amazon.com>
Committed: Wed May 9 14:01:51 2018 -0700

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/FileIOEngine.java     | 32 +++++++++++++++-----
 .../hbase/io/hfile/bucket/TestFileIOEngine.java | 19 +++++++++---
 2 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9c046c09/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 29b810f..0710d26 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@@ -49,6 +50,7 @@ public class FileIOEngine implements IOEngine {
   private final String[] filePaths;
   private final FileChannel[] fileChannels;
   private final RandomAccessFile[] rafs;
+  private final ReentrantLock[] channelLocks;
 
   private final long sizePerFile;
   private final long capacity;
@@ -75,6 +77,7 @@ public class FileIOEngine implements IOEngine {
       }
     }
     this.rafs = new RandomAccessFile[filePaths.length];
+    this.channelLocks = new ReentrantLock[filePaths.length];
     for (int i = 0; i < filePaths.length; i++) {
       String filePath = filePaths[i];
       try {
@@ -90,6 +93,7 @@ public class FileIOEngine implements IOEngine {
         }
         rafs[i].setLength(sizePerFile);
         fileChannels[i] = rafs[i].getChannel();
+        channelLocks[i] = new ReentrantLock();
         LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
             + ", on the path:" + filePath);
       } catch (IOException fex) {
@@ -233,8 +237,7 @@ public class FileIOEngine implements IOEngine {
       } catch (ClosedByInterruptException e) {
         throw e;
       } catch (ClosedChannelException e) {
-        LOG.warn("Caught ClosedChannelException accessing BucketCache, 
reopening file. ", e);
-        refreshFileConnection(accessFileNum);
+        refreshFileConnection(accessFileNum, e);
         continue;
       }
       // recover the limit
@@ -282,13 +285,26 @@ public class FileIOEngine implements IOEngine {
   }
 
   @VisibleForTesting
-  void refreshFileConnection(int accessFileNum) throws IOException {
-    FileChannel fileChannel = fileChannels[accessFileNum];
-    if (fileChannel != null) {
-      fileChannel.close();
+  void refreshFileConnection(int accessFileNum, IOException ioe) throws 
IOException {
+    ReentrantLock channelLock = channelLocks[accessFileNum];
+    channelLock.lock();
+    try {
+      FileChannel fileChannel = fileChannels[accessFileNum];
+      if (fileChannel != null) {
+        // Don't re-open a channel if we were waiting on another
+        // thread to re-open the channel and it is now open.
+        if (fileChannel.isOpen()) {
+          return;
+        }
+        fileChannel.close();
+      }
+      LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening 
file: "
+          + filePaths[accessFileNum], ioe);
+      rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], 
"rw");
+      fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
+    } finally{
+      channelLock.unlock();
     }
-    rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
-    fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
   }
 
   private static interface FileAccessor {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9c046c09/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 6480986..efb8145 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -18,7 +18,8 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
@@ -143,10 +144,18 @@ public class TestFileIOEngine {
   }
 
   @Test
-  public void testRefreshFileConnectionClosesConnections() throws IOException {
-    FileChannel fileChannel = fileIOEngine.getFileChannels()[0];
+  public void testRefreshFileConnection() throws IOException {
+    FileChannel[] fileChannels = fileIOEngine.getFileChannels();
+    FileChannel fileChannel = fileChannels[0];
     assertNotNull(fileChannel);
-    fileIOEngine.refreshFileConnection(0);
-    assertFalse(fileChannel.isOpen());
+    fileChannel.close();
+    fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
+    FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
+    FileChannel reopenedFileChannel = reopenedFileChannels[0];
+    assertNotEquals(fileChannel, reopenedFileChannel);
+    assertEquals(fileChannels.length, reopenedFileChannels.length);
+    for (int i = 1; i < fileChannels.length; i++) {
+      assertEquals(fileChannels[i], reopenedFileChannels[i]);
+    }
   }
 }

Reply via email to