Repository: hbase Updated Branches: refs/heads/HBASE-17081 85d4947dc -> 1d235b9aa
HBASE-17373 Reverse the order of snapshot creation in the CompactingMemStore (Eshcar Hillel) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d235b9a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d235b9a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d235b9a Branch: refs/heads/HBASE-17081 Commit: 1d235b9aafc67cc3df236b7e0ff3251d162078d6 Parents: 85d4947 Author: tedyu <[email protected]> Authored: Sun Jan 1 07:58:48 2017 -0800 Committer: tedyu <[email protected]> Committed: Sun Jan 1 07:58:48 2017 -0800 ---------------------------------------------------------------------- .../hbase/regionserver/CompactingMemStore.java | 24 ++++--- .../hbase/regionserver/CompactionPipeline.java | 75 ++++++++------------ .../regionserver/VersionedSegmentsList.java | 5 +- .../client/TestAsyncTableGetMultiThreaded.java | 22 ++++-- ...ableGetMultiThreadedWithBasicCompaction.java | 35 +++++++++ ...ableGetMultiThreadedWithEagerCompaction.java | 35 +++++++++ 6 files changed, 135 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 1cd30dd..5c31122 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -399,18 +399,26 @@ public class CompactingMemStore extends AbstractMemStore { } private void pushTailToSnapshot() { - ImmutableSegment tail = pipeline.pullTail(); - if (!tail.isEmpty()) { - this.snapshot = tail; - } + VersionedSegmentsList segments = pipeline.getVersionedTail(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now } private void pushPipelineToSnapshot() { - List<ImmutableSegment> segments = pipeline.drain(); - if (!segments.isEmpty()) { - this.snapshot = - SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); + VersionedSegmentsList segments = pipeline.getVersionedList(); + pushToSnapshot(segments.getStoreSegments()); + pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now + } + + private void pushToSnapshot(List<ImmutableSegment> segments) { + if(segments.isEmpty()) return; + if(segments.size() == 1 && !segments.get(0).isEmpty()) { + this.snapshot = segments.get(0); + return; } + // else craete composite snapshot + this.snapshot = + SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments); } private RegionServicesForStores getRegionServices() { http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..a8afef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -52,9 +52,6 @@ public class CompactionPipeline { private LinkedList<ImmutableSegment> pipeline; private long version; - private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() - .createImmutableSegment((CellComparator) null); - public CompactionPipeline(RegionServicesForStores region) { this.region = region; this.pipeline = new LinkedList<ImmutableSegment>(); @@ -69,44 +66,33 @@ public class CompactionPipeline { } } - public ImmutableSegment pullTail() { + public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - if(pipeline.isEmpty()) { - return EMPTY_MEM_STORE_SEGMENT; - } - return removeLast(); + List<ImmutableSegment> segmentList = new ArrayList<>(pipeline); + return new VersionedSegmentsList(segmentList, version); } } - public List<ImmutableSegment> drain() { - int drainSize = pipeline.size(); - List<ImmutableSegment> result = new ArrayList<ImmutableSegment>(drainSize); + public VersionedSegmentsList getVersionedTail() { synchronized (pipeline){ - version++; - for(int i=0; i<drainSize; i++) { - ImmutableSegment segment = this.pipeline.removeFirst(); - result.add(i,segment); + List<ImmutableSegment> segmentList = new ArrayList<>(); + if(!pipeline.isEmpty()) { + segmentList.add(0, pipeline.getLast()); } - return result; - } - } - - public VersionedSegmentsList getVersionedList() { - synchronized (pipeline){ - LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline); - VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version); - return res; + return new VersionedSegmentsList(segmentList, version); } } /** - * Swaps the versioned list at the tail of the pipeline with the new compacted segment. - * Swapping only if there were no changes to the suffix of the list while it was compacted. - * @param versionedList tail of the pipeline that was compacted - * @param segment new compacted segment + * Swaps the versioned list at the tail of the pipeline with a new segment. + * Swapping only if there were no changes to the suffix of the list since the version list was + * created. + * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline + * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be + * removed. * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out * During index merge op this will be false and for compaction it will be true. - * @return true iff swapped tail with new compacted segment + * @return true iff swapped tail with new segment */ public boolean swap( VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { @@ -120,26 +106,32 @@ public class CompactionPipeline { } suffix = versionedList.getStoreSegments(); if (LOG.isDebugEnabled()) { - LOG.debug("Swapping pipeline suffix with compacted item. " + int count = 0; + if(segment != null) { + segment.getCellsCount(); + } + LOG.debug("Swapping pipeline suffix. " + "Just before the swap the number of segments in pipeline is:" + versionedList.getStoreSegments().size() - + ", and the number of cells in new segment is:" + segment.getCellsCount()); + + ", and the number of cells in new segment is:" + count); } - swapSuffix(suffix,segment, closeSuffix); + swapSuffix(suffix, segment, closeSuffix); } - if (region != null) { + if (closeSuffix && region != null) { // update the global memstore size counter long suffixDataSize = getSegmentsKeySize(suffix); - long newDataSize = segment.keySize(); + long newDataSize = 0; + if(segment != null) newDataSize = segment.keySize(); long dataSizeDelta = suffixDataSize - newDataSize; long suffixHeapOverhead = getSegmentsHeapOverhead(suffix); - long newHeapOverhead = segment.heapOverhead(); + long newHeapOverhead = 0; + if(segment != null) newHeapOverhead = segment.heapOverhead(); long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead; region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta)); if (LOG.isDebugEnabled()) { - LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: " + LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead - + " compacted item heap overhead: " + newHeapOverhead); + + " new segment heap overhead: " + newHeapOverhead); } } return true; @@ -207,7 +199,7 @@ public class CompactionPipeline { public List<Segment> getSegments() { synchronized (pipeline){ - return new LinkedList<Segment>(pipeline); + return new LinkedList<>(pipeline); } } @@ -260,12 +252,7 @@ public class CompactionPipeline { } } pipeline.removeAll(suffix); - pipeline.addLast(segment); - } - - private ImmutableSegment removeLast() { - version++; - return pipeline.removeLast(); + if(segment != null) pipeline.addLast(segment); } private boolean addFirst(ImmutableSegment segment) { http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java index 01160bf..ab751f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class VersionedSegmentsList { - private final LinkedList<ImmutableSegment> storeSegments; + private final List<ImmutableSegment> storeSegments; private final long version; - public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, long version) { + public VersionedSegmentsList(List<ImmutableSegment> storeSegments, long version) { this.storeSegments = storeSegments; this.version = version; } http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index da8141b..82fe3cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded { @BeforeClass public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.NONE); + } + + protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(memoryCompaction)); + TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { - int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + for (int i = 0; i < COUNT; i++) { + assertEquals(i, + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get() + .getValue(FAMILY, QUALIFIER))); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java new file mode 100644 index 0000000..3243175 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.hbase.client; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.BASIC); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java new file mode 100644 index 0000000..863ec1f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.hbase.client; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends + TestAsyncTableGetMultiThreaded { + + @BeforeClass + public static void setUp() throws Exception { + setUp(HColumnDescriptor.MemoryCompaction.EAGER); + } + +}
