Repository: hbase
Updated Branches:
  refs/heads/branch-1 39e8e2fb5 -> 356d4e918
  refs/heads/master ee0f148c7 -> 1950acc67


HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1950acc6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1950acc6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1950acc6

Branch: refs/heads/master
Commit: 1950acc67a2510be370d6fc7859b562e58070942
Parents: ee0f148
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Tue Jun 6 12:50:59 2017 +0530
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Tue Jun 6 12:52:46 2017 -0700

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      | 71 +++++++++++++++++++-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java | 24 +++++--
 .../hadoop/hbase/io/hfile/HFileBlock.java       | 23 +++++++
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |  9 +++
 4 files changed, 121 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1950acc6/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 055e46a..25a3373 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -19,8 +19,13 @@ package org.apache.hadoop.hbase.io;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,6 +41,8 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 public class FSDataInputStreamWrapper implements Closeable {
+  private static final Log LOG = 
LogFactory.getLog(FSDataInputStreamWrapper.class);
+
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
@@ -80,6 +87,11 @@ public class FSDataInputStreamWrapper implements Closeable {
   // reads without hbase checksum verification.
   private volatile int hbaseChecksumOffCount = -1;
 
+  private Boolean instanceOfCanUnbuffer = null;
+  // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method 
to avoid compilation
+  // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
+  private Method unbuffer = null;
+
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException 
{
     this(fs, path, false, -1L);
   }
@@ -232,4 +244,61 @@ public class FSDataInputStreamWrapper implements Closeable 
{
   public HFileSystem getHfs() {
     return this.hfs;
   }
-}
+
+  /**
+   * This will free sockets and file descriptors held by the stream only when 
the stream implements
+   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only 
when all the clients
+   * using this stream to read the blocks have finished reading. If by chance 
the stream is
+   * unbuffered and there are clients still holding this stream for read then 
on next client read
+   * request a new socket will be opened by Datanode without client knowing 
about it and will serve
+   * its read request. Note: If this socket is idle for some time then the 
DataNode will close the
+   * socket and the socket will move into CLOSE_WAIT state and on the next 
client request on this
+   * stream, the current socket will be closed and a new socket will be opened 
to serve the
+   * requests.
+   */
+  @SuppressWarnings({ "rawtypes" })
+  public void unbuffer() {
+    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
+    if (stream != null) {
+      InputStream wrappedStream = stream.getWrappedStream();
+      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is 
available in Hadoop
+      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object 
implements the
+      // CanUnbuffer interface or not and based on that call the unbuffer api.
+      final Class<? extends InputStream> streamClass = 
wrappedStream.getClass();
+      if (this.instanceOfCanUnbuffer == null) {
+        // To ensure we compute whether the stream is instance of CanUnbuffer 
only once.
+        this.instanceOfCanUnbuffer = false;
+        Class<?>[] streamInterfaces = streamClass.getInterfaces();
+        for (Class c : streamInterfaces) {
+          if 
(c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
+            try {
+              this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
+            } catch (NoSuchMethodException | SecurityException e) {
+              LOG.warn("Failed to find 'unbuffer' method in class " + 
streamClass
+                  + " . So there may be a TCP socket connection "
+                  + "left open in CLOSE_WAIT state.",
+                e);
+              return;
+            }
+            this.instanceOfCanUnbuffer = true;
+            break;
+          }
+        }
+      }
+      if (this.instanceOfCanUnbuffer) {
+        try {
+          this.unbuffer.invoke(wrappedStream);
+        } catch (IllegalAccessException | IllegalArgumentException | 
InvocationTargetException e) {
+          LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass
+              + " . So there may be a TCP socket connection left open in 
CLOSE_WAIT state.",
+            e);
+        }
+      } else {
+        LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+            + " . So there may be a TCP socket connection "
+            + "left open in CLOSE_WAIT state. For more details check "
+            + "https://issues.apache.org/jira/browse/HBASE-9393";);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1950acc6/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 9f2db83..feddc2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -499,6 +499,12 @@ public class HFile {
 
     @VisibleForTesting
     boolean prefetchComplete();
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called 
from multiple threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -516,7 +522,7 @@ public class HFile {
    */
   
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper 
fsdis, long size,
+  private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, 
long size,
       CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, 
Configuration conf)
       throws IOException {
     FixedFileTrailer trailer = null;
@@ -537,10 +543,15 @@ public class HFile {
     } catch (Throwable t) {
       IOUtils.closeQuietly(fsdis);
       throw new CorruptHFileException("Problem reading HFile Trailer from file 
" + path, t);
+    } finally {
+      fsdis.unbuffer();
     }
   }
 
   /**
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so 
caller needs to ensure
+   * that no other threads have access to the same passed reference.
    * @param fs A file system
    * @param path Path to HFile
    * @param fsdis a stream of path's file
@@ -565,7 +576,7 @@ public class HFile {
     } else {
       hfs = (HFileSystem) fs;
     }
-    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, 
primaryReplicaReader, conf);
+    return openReader(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, 
conf);
   }
 
   /**
@@ -597,18 +608,21 @@ public class HFile {
       boolean primaryReplicaReader, Configuration conf) throws IOException {
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null 
CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), 
cacheConf,
+    return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf,
       stream.getHfs(), primaryReplicaReader, conf);
   }
 
   /**
-   * This factory method is used only by unit tests
+   * This factory method is used only by unit tests. <br/>
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so 
caller needs to ensure
+   * that no other threads have access to the same passed reference.
    */
   @VisibleForTesting
   static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, 
long size,
       CacheConfig cacheConf, Configuration conf) throws IOException {
     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf);
+    return openReader(path, wrapper, size, cacheConf, null, true, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/1950acc6/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 9a354f9..72f96a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -1391,6 +1393,12 @@ public class HFileBlock implements Cacheable {
 
     void setIncludesMemstoreTS(boolean includesMemstoreTS);
     void setDataBlockEncoder(HFileDataBlockEncoder encoder);
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called 
from multiple threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -1449,6 +1457,8 @@ public class HFileBlock implements Cacheable {
     // Cache the fileName
     private String pathName;
 
+    private final Lock streamLock = new ReentrantLock();
+
     FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem 
hfs, Path path,
         HFileContext fileContext) throws IOException {
       this.fileSize = fileSize;
@@ -1848,6 +1858,19 @@ public class HFileBlock implements Cacheable {
     }
 
     @Override
+    public void unbufferStream() {
+      // To handle concurrent reads, ensure that no other client is accessing 
the streams while we
+      // unbuffer it.
+      if (streamLock.tryLock()) {
+        try {
+          this.streamWrapper.unbuffer();
+        } finally {
+          streamLock.unlock();
+        }
+      }
+    }
+
+    @Override
     public String toString() {
       return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + 
fileContext;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1950acc6/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index cc6d7de..d310a68 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -588,6 +588,10 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
 
     @Override
     public void close() {
+      if (!pread) {
+        // For seek + pread stream socket should be closed when the scanner is 
closed. HBASE-9393
+        reader.unbufferStream();
+      }
       this.returnBlocks(true);
     }
 
@@ -1858,4 +1862,9 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
   public int getMajorVersion() {
     return 3;
   }
+
+  @Override
+  public void unbufferStream() {
+    fsBlockReader.unbufferStream();
+  }
 }

Reply via email to