HBASE-21551 Memory leak when use scan with STREAM at server side
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b854859 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b854859 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b854859 Branch: refs/heads/HBASE-20952 Commit: 3b854859f6fad44cbf31164374569a6ab23f3623 Parents: f49baf2 Author: huzheng <open...@gmail.com> Authored: Wed Dec 5 22:57:49 2018 +0800 Committer: huzheng <open...@gmail.com> Committed: Thu Dec 6 10:55:42 2018 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HStoreFile.java | 3 +- .../hbase/regionserver/StoreFileReader.java | 3 ++ .../regionserver/TestSwitchToStreamRead.java | 50 ++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 4aff949..9c94990 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener { private final AtomicInteger refCount = new AtomicInteger(0); // Set implementation must be of concurrent type - private final Set<StoreFileReader> streamReaders; + @VisibleForTesting + final Set<StoreFileReader> streamReaders; private final boolean noReadahead; http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 3fbddf2..d9008b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -186,6 +186,9 @@ public class StoreFileReader { if (!shared) { try { reader.close(false); + if (this.listener != null) { + this.listener.storeFileReaderClosed(this); + } } catch (IOException e) { LOG.warn("failed to close stream reader", e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3b854859/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java index 815643d..037b13e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; @@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Ignore; @@ -98,6 +105,49 @@ public class TestSwitchToStreamRead { UTIL.cleanupTestDir(); } + private Set<StoreFileReader> getStreamReaders() { + List<HStore> stores = REGION.getStores(); + Assert.assertEquals(1, stores.size()); + HStore firstStore = stores.get(0); + Assert.assertNotNull(firstStore); + Collection<HStoreFile> storeFiles = firstStore.getStorefiles(); + Assert.assertEquals(1, storeFiles.size()); + HStoreFile firstSToreFile = storeFiles.iterator().next(); + Assert.assertNotNull(firstSToreFile); + return Collections.unmodifiableSet(firstSToreFile.streamReaders); + } + + /** + * Test Case for HBASE-21551 + */ + @Test + public void testStreamReadersCleanup() throws IOException { + Set<StoreFileReader> streamReaders = getStreamReaders(); + Assert.assertEquals(0, getStreamReaders().size()); + try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) { + StoreScanner storeScanner = + (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); + List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream() + .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs) + .collect(Collectors.toList()); + Assert.assertEquals(1, sfScanners.size()); + StoreFileScanner sfScanner = sfScanners.get(0); + Assert.assertFalse(sfScanner.getReader().shared); + + // There should be a stream reader + Assert.assertEquals(1, getStreamReaders().size()); + } + Assert.assertEquals(0, getStreamReaders().size()); + + // The streamsReader should be clear after region close even if there're some opened stream + // scanner. + RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM)); + Assert.assertNotNull(scanner); + Assert.assertEquals(1, getStreamReaders().size()); + REGION.close(); + Assert.assertEquals(0, streamReaders.size()); + } + @Test public void test() throws IOException { try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {