Repository: hbase Updated Branches: refs/heads/branch-1.2 c1289960d -> 4160f7273
HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT Signed-off-by: Andrew Purtell <[email protected]> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2bde1ac4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2bde1ac4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2bde1ac4 Branch: refs/heads/branch-1.2 Commit: 2bde1ac4050eeaaa262c41459a6590359e69d78c Parents: c128996 Author: Ashish Singhi <[email protected]> Authored: Tue Jun 6 17:49:08 2017 +0530 Committer: Sean Busbey <[email protected]> Committed: Tue Jun 20 02:49:12 2017 -0500 ---------------------------------------------------------------------- .../hbase/io/FSDataInputStreamWrapper.java | 71 +++++++++++++++++++- .../hadoop/hbase/io/HalfStoreFileReader.java | 4 ++ .../hbase/io/hfile/AbstractHFileReader.java | 8 +++ .../org/apache/hadoop/hbase/io/hfile/HFile.java | 26 +++++-- .../hadoop/hbase/io/hfile/HFileBlock.java | 21 +++++- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 5 ++ .../hadoop/hbase/io/hfile/HFileScanner.java | 5 ++ .../hbase/regionserver/StoreFileScanner.java | 5 +- 8 files changed, 136 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index b06be6b..dc168da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -18,7 +18,12 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +37,8 @@ import com.google.common.annotations.VisibleForTesting; * see method comments. */ public class FSDataInputStreamWrapper { + private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class); + private final HFileSystem hfs; private final Path path; private final FileLink link; @@ -74,6 +81,11 @@ public class FSDataInputStreamWrapper { // reads without hbase checksum verification. private volatile int hbaseChecksumOffCount = -1; + private Boolean instanceOfCanUnbuffer = null; + // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation + // errors against Hadoop pre 2.6.4 and 2.7.1 versions. + private Method unbuffer = null; + public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { this(fs, null, path, false); } @@ -219,4 +231,61 @@ public class FSDataInputStreamWrapper { public HFileSystem getHfs() { return this.hfs; } -} + + /** + * This will free sockets and file descriptors held by the stream only when the stream implements + * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients + * using this stream to read the blocks have finished reading. If by chance the stream is + * unbuffered and there are clients still holding this stream for read then on next client read + * request a new socket will be opened by Datanode without client knowing about it and will serve + * its read request. Note: If this socket is idle for some time then the DataNode will close the + * socket and the socket will move into CLOSE_WAIT state and on the next client request on this + * stream, the current socket will be closed and a new socket will be opened to serve the + * requests. + */ + @SuppressWarnings({ "rawtypes" }) + public void unbuffer() { + FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum()); + if (stream != null) { + InputStream wrappedStream = stream.getWrappedStream(); + // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop + // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the + // CanUnbuffer interface or not and based on that call the unbuffer api. + final Class<? extends InputStream> streamClass = wrappedStream.getClass(); + if (this.instanceOfCanUnbuffer == null) { + // To ensure we compute whether the stream is instance of CanUnbuffer only once. + this.instanceOfCanUnbuffer = false; + Class<?>[] streamInterfaces = streamClass.getInterfaces(); + for (Class c : streamInterfaces) { + if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) { + try { + this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); + } catch (NoSuchMethodException | SecurityException e) { + LOG.warn("Failed to find 'unbuffer' method in class " + streamClass + + " . So there may be a TCP socket connection " + + "left open in CLOSE_WAIT state.", + e); + return; + } + this.instanceOfCanUnbuffer = true; + break; + } + } + } + if (this.instanceOfCanUnbuffer) { + try { + this.unbuffer.invoke(wrappedStream); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass + + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.", + e); + } + } else { + LOG.warn("Failed to find 'unbuffer' method in class " + streamClass + + " . So there may be a TCP socket connection " + + "left open in CLOSE_WAIT state. For more details check " + + "https://issues.apache.org/jira/browse/HBASE-9393"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index ed2e925..c259fc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -322,6 +322,10 @@ public class HalfStoreFileReader extends StoreFile.Reader { public Cell getNextIndexedKey() { return null; } + + @Override + public void close() { + } }; } http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java index 7d8b572..5039427 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java @@ -332,6 +332,14 @@ public abstract class AbstractHFileReader public HFile.Reader getReader() { return reader; } + + @Override + public void close() { + if (!pread) { + // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393 + reader.unbufferStream(); + } + } } /** For testing */ http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index a67bf8c..1ae05b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -454,6 +454,12 @@ public class HFile { boolean isPrimaryReplicaReader(); void setPrimaryReplicaReader(boolean isPrimaryReplicaReader); + + /** + * To close the stream's socket. Note: This can be concurrently called from multiple threads and + * implementation should take care of thread safety. + */ + void unbufferStream(); } /** @@ -470,8 +476,8 @@ public class HFile { */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH", justification="Intentional") - private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, - long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { + private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size, + CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { FixedFileTrailer trailer = null; try { boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); @@ -492,10 +498,15 @@ public class HFile { LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2); } throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t); + } finally { + fsdis.unbuffer(); } } /** + * The sockets and the file descriptors held by the method parameter + * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure + * that no other threads have access to the same passed reference. * @param fs A file system * @param path Path to HFile * @param fsdis a stream of path's file @@ -519,7 +530,7 @@ public class HFile { } else { hfs = (HFileSystem)fs; } - return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf); + return openReader(path, fsdis, size, cacheConf, hfs, conf); } /** @@ -534,18 +545,21 @@ public class HFile { FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf"); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path); - return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), + return openReader(path, stream, fs.getFileStatus(path).getLen(), cacheConf, stream.getHfs(), conf); } /** - * This factory method is used only by unit tests + * This factory method is used only by unit tests. <br/> + * The sockets and the file descriptors held by the method parameter + * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure + * that no other threads have access to the same passed reference. */ static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf) throws IOException { FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis); - return pickReaderVersion(path, wrapper, size, cacheConf, null, conf); + return openReader(path, wrapper, size, cacheConf, null, conf); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 351c3c4..751c667 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1327,6 +1327,12 @@ public class HFileBlock implements Cacheable { /** Get the default decoder for blocks from this file. */ HFileBlockDecodingContext getDefaultBlockDecodingContext(); + + /** + * To close the stream's socket. Note: This can be concurrently called from multiple threads and + * implementation should take care of thread safety. + */ + void unbufferStream(); } /** @@ -1348,7 +1354,7 @@ public class HFileBlock implements Cacheable { /** The path (if any) where this data is coming from */ protected Path path; - private final Lock streamLock = new ReentrantLock(); + protected final Lock streamLock = new ReentrantLock(); /** The default buffer size for our buffered streams */ public static final int DEFAULT_BUFFER_SIZE = 1 << 20; @@ -1764,6 +1770,19 @@ public class HFileBlock implements Cacheable { } @Override + public void unbufferStream() { + // To handle concurrent reads, ensure that no other client is accessing the streams while we + // unbuffer it. + if (streamLock.tryLock()) { + try { + this.streamWrapper.unbuffer(); + } finally { + streamLock.unlock(); + } + } + } + + @Override public String toString() { return "hfs=" + hfs + ", path=" + path + ", fileContext=" + fileContext; } http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index fd5fc02..3e6c673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -1413,4 +1413,9 @@ public class HFileReaderV2 extends AbstractHFileReader { boolean prefetchComplete() { return PrefetchExecutor.isCompleted(path); } + + @Override + public void unbufferStream() { + fsBlockReader.unbufferStream(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index 3e0f91f..b5f207c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -161,4 +161,9 @@ public interface HFileScanner { * @return the next key in the index (the key to seek to the next block) */ Cell getNextIndexedKey(); + + /** + * Close the stream socket to handle RS CLOSE_WAIT. HBASE-9393 + */ + void close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2bde1ac4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 0785db1..a55c3da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -51,6 +51,7 @@ public class StoreFileScanner implements KeyValueScanner { private final StoreFile.Reader reader; private final HFileScanner hfs; private Cell cur = null; + private boolean closed = false; private boolean realSeekDone; private boolean delayedReseek; @@ -252,8 +253,10 @@ public class StoreFileScanner implements KeyValueScanner { } public void close() { - // Nothing to close on HFileScanner? cur = null; + if (closed) return; + closed = true; + this.hfs.close(); } /**
