HBASE-17655 Removing MemStoreScanner and SnapshotScanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f4ae0a0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f4ae0a0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f4ae0a0 Branch: refs/heads/hbase-12439 Commit: 8f4ae0a0dcb658c4fe669bc4cdc68ad8e6219daf Parents: cc59fe4 Author: eshcar <esh...@yahoo-inc.com> Authored: Tue Mar 21 12:32:59 2017 +0200 Committer: eshcar <esh...@yahoo-inc.com> Committed: Tue Mar 21 12:35:47 2017 +0200 ---------------------------------------------------------------------- .../example/ZooKeeperScanPolicyObserver.java | 4 +- .../hbase/coprocessor/RegionObserver.java | 35 +- .../hbase/mob/DefaultMobStoreFlusher.java | 2 +- .../hbase/regionserver/AbstractMemStore.java | 14 + .../hbase/regionserver/CompactingMemStore.java | 21 +- .../regionserver/CompositeImmutableSegment.java | 33 +- .../hbase/regionserver/DefaultMemStore.java | 15 +- .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hbase/regionserver/ImmutableSegment.java | 12 +- .../hbase/regionserver/MemStoreCompactor.java | 2 +- .../MemStoreCompactorSegmentsIterator.java | 17 +- .../MemStoreMergerSegmentsIterator.java | 52 ++- .../hbase/regionserver/MemStoreScanner.java | 334 ------------------- .../regionserver/MemStoreSegmentsIterator.java | 23 +- .../hbase/regionserver/MemStoreSnapshot.java | 15 +- .../regionserver/RegionCoprocessorHost.java | 7 +- .../hadoop/hbase/regionserver/Segment.java | 8 +- .../hbase/regionserver/SegmentScanner.java | 13 +- .../hbase/regionserver/SnapshotScanner.java | 105 ------ .../hadoop/hbase/regionserver/StoreFlusher.java | 8 +- .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../hbase/coprocessor/SimpleRegionObserver.java | 2 +- .../TestRegionObserverScannerOpenHook.java | 6 +- .../regionserver/NoOpScanPolicyObserver.java | 4 +- .../regionserver/TestCompactingMemStore.java | 30 +- .../TestCompactingToCellArrayMapMemStore.java | 32 +- .../hbase/regionserver/TestDefaultMemStore.java | 20 +- .../regionserver/TestMemStoreChunkPool.java | 14 +- .../regionserver/TestReversibleScanners.java | 66 +++- .../hbase/util/TestCoprocessorScanPolicy.java | 5 +- 30 files changed, 262 insertions(+), 641 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 2343c1d..b7df9b4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); if (scanInfo == null) { // take default action @@ -196,7 +196,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { } Scan scan = new Scan(); scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index a3db3b1..e36feea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coprocessor; import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -128,16 +129,16 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed + * @param scanners the scanners for the memstore that is flushed * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. - * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, + * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, List, * InternalScanner, long)} */ @Deprecated default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) + final Store store, final List<KeyValueScanner> scanners, final InternalScanner s) throws IOException { return s; } @@ -151,16 +152,32 @@ public interface RegionObserver extends Coprocessor { * effect in this hook. * @param c the environment provided by the region server * @param store the store being flushed - * @param memstoreScanner the scanner for the memstore that is flushed + * @param scanners the scanners for the memstore that is flushed * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param readPoint the readpoint to create scanner * @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. */ default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final Store store, final List<KeyValueScanner> scanners, final InternalScanner s, final long readPoint) throws IOException { - return preFlushScannerOpen(c, store, memstoreScanner, s); + return preFlushScannerOpen(c, store, scanners, s); + } + + /** + * Maintain backward compatibility. + * @param c the environment provided by the region server + * @param store the store being flushed + * @param scanner the scanner for the memstore that is flushed + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param readPoint the readpoint to create scanner + * @return the scanner to use during the flush. {@code null} if the default implementation + * is to be used. + */ + default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final KeyValueScanner scanner, final InternalScanner s, + final long readPoint) throws IOException { + return preFlushScannerOpen(c, store, Collections.singletonList(scanner), s, readPoint); } /** @@ -1113,8 +1130,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. * <p> - * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. * <p> @@ -1145,8 +1161,7 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. * <p> - * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, - * long)} and {@link #preCompactScannerOpen(ObserverContext, + * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. * <p> http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 2456a41..1a1c5a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -104,7 +104,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index d44486c..cff2b27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -60,6 +60,20 @@ public abstract class AbstractMemStore implements MemStore { public final static long DEEP_OVERHEAD = FIXED_OVERHEAD; + public static long addToScanners(List<? extends Segment> segments, long readPt, long order, + List<KeyValueScanner> scanners) { + for (Segment item : segments) { + order = addToScanners(item, readPt, order, scanners); + } + return order; + } + + protected static long addToScanners(Segment segment, long readPt, long order, + List<KeyValueScanner> scanners) { + scanners.add(segment.getScanner(readPt, order)); + return order - 1; + } + protected AbstractMemStore(final Configuration conf, final CellComparator c) { this.conf = conf; this.comparator = c; http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 926b3f7..26b2f49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -318,21 +317,15 @@ public class CompactingMemStore extends AbstractMemStore { */ public List<KeyValueScanner> getScanners(long readPt) throws IOException { List<? extends Segment> pipelineList = pipeline.getSegments(); - int order = pipelineList.size() + snapshot.getNumOfSegments(); + List<? extends Segment> snapshotList = snapshot.getAllSegments(); + long order = 1 + pipelineList.size() + snapshotList.size(); // The list of elements in pipeline + the active element + the snapshot segment - // TODO : This will change when the snapshot is made of more than one element // The order is the Segment ordinal - List<KeyValueScanner> list = new ArrayList<>(order+1); - list.add(this.active.getScanner(readPt, order + 1)); - for (Segment item : pipelineList) { - list.add(item.getScanner(readPt, order)); - order--; - } - for (Segment item : snapshot.getAllSegments()) { - list.add(item.getScanner(readPt, order)); - order--; - } - return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list)); + List<KeyValueScanner> list = new ArrayList<KeyValueScanner>((int) order); + order = addToScanners(active, readPt, order, list); + order = addToScanners(pipelineList, readPt, order, list); + addToScanners(snapshotList, readPt, order, list); + return list; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java index eeade4f..2f89ec7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -72,16 +71,6 @@ public class CompositeImmutableSegment extends ImmutableSegment { } /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - @Override - public KeyValueScanner getSnapshotScanner() { - return getScanner(Long.MAX_VALUE, Long.MAX_VALUE); - } - - /** * @return whether the segment has any cells */ @Override @@ -148,8 +137,7 @@ public class CompositeImmutableSegment extends ImmutableSegment { */ @Override public KeyValueScanner getScanner(long readPoint) { - // Long.MAX_VALUE is DEFAULT_SCANNER_ORDER - return getScanner(readPoint,Long.MAX_VALUE); + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); } /** @@ -158,19 +146,14 @@ public class CompositeImmutableSegment extends ImmutableSegment { */ @Override public KeyValueScanner getScanner(long readPoint, long order) { - KeyValueScanner resultScanner; - List<KeyValueScanner> list = new ArrayList<>(segments.size()); - for (ImmutableSegment s : segments) { - list.add(s.getScanner(readPoint, order)); - } - - try { - resultScanner = new MemStoreScanner(getComparator(), list); - } catch (IOException ie) { - throw new IllegalStateException(ie); - } + throw new IllegalStateException("Not supported by CompositeImmutableScanner"); + } - return resultScanner; + @Override + public List<KeyValueScanner> getScanners(long readPoint, long order) { + List<KeyValueScanner> list = new ArrayList<>(segments.size()); + AbstractMemStore.addToScanners(segments, readPoint, order, list); + return list; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index b3e9c65..d1f6b1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -75,10 +74,6 @@ public class DefaultMemStore extends AbstractMemStore { super(conf, c); } - void dump() { - super.dump(LOG); - } - /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(long)} @@ -129,11 +124,11 @@ public class DefaultMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List<KeyValueScanner> getScanners(long readPt) throws IOException { - List<KeyValueScanner> list = new ArrayList<>(2); - list.add(this.active.getScanner(readPt, 1)); - list.add(this.snapshot.getScanner(readPt, 0)); - return Collections.<KeyValueScanner> singletonList( - new MemStoreScanner(getComparator(), list)); + List<KeyValueScanner> list = new ArrayList<>(); + long order = snapshot.getNumOfSegments(); + order = addToScanners(active, readPt, order, list); + addToScanners(snapshot.getAllSegments(), readPt, order, list); + return list; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 8cb3a1d..ef49f29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -52,7 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java index c8d27b2..f1273a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -34,9 +34,7 @@ import java.util.List; /** * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, - * and is not needed for a {@link MutableSegment}. Specifically, the method - * {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the - * {@link MemStoreSnapshot} object. + * and is not needed for a {@link MutableSegment}. */ @InterfaceAudience.Private public class ImmutableSegment extends Segment { @@ -130,14 +128,6 @@ public class ImmutableSegment extends Segment { } ///////////////////// PUBLIC METHODS ///////////////////// - /** - * Builds a special scanner for the MemStoreSnapshot object that is different than the - * general segment scanner. - * @return a special scanner for the MemStoreSnapshot object - */ - public KeyValueScanner getSnapshotScanner() { - return new SnapshotScanner(this); - } @Override public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index c435098..dfa7d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -252,7 +252,7 @@ public class MemStoreCompactor { iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + compactionKVMax); result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index 6a30eac..8f481e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Scan; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -50,11 +49,16 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator List<ImmutableSegment> segments, CellComparator comparator, int compactionKVMax, Store store ) throws IOException { - super(segments,comparator,compactionKVMax,store); + super(compactionKVMax); + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners); // build the scanner based on Query Matcher // reinitialize the compacting scanner for each instance of iterator - compactingScanner = createScanner(store, scanner); + compactingScanner = createScanner(store, scanners); hasMore = compactingScanner.next(kvs, scannerContext); @@ -93,7 +97,6 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator public void close() { compactingScanner.close(); compactingScanner = null; - scanner = null; } @Override @@ -106,13 +109,13 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * * @return the scanner */ - private StoreScanner createScanner(Store store, KeyValueScanner scanner) + private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(); //Get all available versions StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner), + new StoreScanner(store, store.getScanInfo(), scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); @@ -146,4 +149,4 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator } return hasMore; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java index 625fc76..3bb814b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreMergerSegmentsIterator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -33,36 +34,67 @@ import java.util.List; @InterfaceAudience.Private public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator { + // heap of scanners, lazily initialized + private KeyValueHeap heap = null; + // remember the initial version of the scanners list + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); + + private boolean closed = false; + // C-tor public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator, - int compactionKVMax, Store store - ) throws IOException { - super(segments,comparator,compactionKVMax,store); + int compactionKVMax) throws IOException { + super(compactionKVMax); + // create the list of scanners to traverse over all the data + // no dirty reads here as these are immutable segments + int order = segments.size(); + AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners); + heap = new KeyValueHeap(scanners, comparator); } @Override public boolean hasNext() { - return (scanner.peek()!=null); + if (closed) { + return false; + } + if (this.heap != null) { + return (this.heap.peek() != null); + } + // Doing this way in case some test cases tries to peek directly + return false; } @Override public Cell next() { - Cell result = null; try { // try to get next - result = scanner.next(); + if (!closed && heap != null) { + return heap.next(); + } } catch (IOException ie) { throw new IllegalStateException(ie); } - return result; + return null; } public void close() { - scanner.close(); - scanner = null; + if (closed) { + return; + } + // Ensuring that all the segment scanners are closed + if (heap != null) { + heap.close(); + // It is safe to do close as no new calls will be made to this scanner. + heap = null; + } else { + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + } + closed = true; } @Override public void remove() { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java deleted file mode 100644 index 2ccdf68..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.htrace.Trace; - -/** - * This is the scanner for any MemStore implementation, derived from MemStore. - * The MemStoreScanner combines KeyValueScanner from different Segments and - * uses the key-value heap and the reversed key-value heap for the aggregated key-values set. - * It is assumed that only traversing forward or backward is used (without zigzagging in between) - */ -@InterfaceAudience.Private -public class MemStoreScanner extends NonLazyKeyValueScanner { - - // heap of scanners, lazily initialized - private KeyValueHeap heap; - - // indicates if the scanner is created for inmemoryCompaction - private boolean inmemoryCompaction; - - // remember the initial version of the scanners list - List<KeyValueScanner> scanners; - - private final CellComparator comparator; - - private boolean closed; - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - * @param inmemoryCompaction true if used for inmemoryCompaction. - * In this case, creates a forward heap always. - */ - public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners, - boolean inmemoryCompaction) throws IOException { - super(); - this.comparator = comparator; - this.scanners = scanners; - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); - } - this.inmemoryCompaction = inmemoryCompaction; - if (inmemoryCompaction) { - // init the forward scanner in case of inmemoryCompaction - initForwardKVHeapIfNeeded(comparator, scanners); - } - } - - /** - * Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan - * and the heap is lazily initialized - * @param comparator Cell Comparator - * @param scanners List of scanners, from which the heap will be built - */ - public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners) - throws IOException { - this(comparator, scanners, false); - } - - private void initForwardKVHeapIfNeeded(CellComparator comparator, List<KeyValueScanner> scanners) - throws IOException { - if (heap == null) { - // lazy init - // In a normal scan case, at the StoreScanner level before the KVHeap is - // created we do a seek or reseek. So that will happen - // on all the scanners that the StoreScanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with normal forward KVHeap. - this.heap = new KeyValueHeap(scanners, comparator); - } - } - - private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator, - List<KeyValueScanner> scanners) throws IOException { - boolean res = false; - if (heap == null) { - // lazy init - // In a normal reverse scan case, at the ReversedStoreScanner level before the - // ReverseKeyValueheap is - // created we do a seekToLastRow or backwardSeek. So that will happen - // on all the scanners that the ReversedStoreSCanner is - // made of. So when we get any of those call to this scanner we init the - // heap here with ReversedKVHeap. - if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) { - for (KeyValueScanner scanner : scanners) { - res |= scanner.seekToLastRow(); - } - } else { - for (KeyValueScanner scanner : scanners) { - res |= scanner.backwardSeek(seekKey); - } - } - this.heap = new ReversedKeyValueHeap(scanners, comparator); - } - return res; - } - - /** - * Returns the cell from the top-most scanner without advancing the iterator. - * The backward traversal is assumed, only if specified explicitly - */ - @Override - public Cell peek() { - if (closed) { - return null; - } - if (this.heap != null) { - return this.heap.peek(); - } - // Doing this way in case some test cases tries to peek directly to avoid NPE - return null; - } - - /** - * Gets the next cell from the top-most scanner. Assumed forward scanning. - */ - @Override - public Cell next() throws IOException { - if (closed) { - return null; - } - if(this.heap != null) { - // loop over till the next suitable value - // take next value from the heap - for (Cell currentCell = heap.next(); - currentCell != null; - currentCell = heap.next()) { - // all the logic of presenting cells is inside the internal KeyValueScanners - // located inside the heap - return currentCell; - } - } - return null; - } - - /** - * Set the scanner at the seek key. Assumed forward scanning. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seek(Cell cell) throws IOException { - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - - if (cell == null) { - close(); - return false; - } - - return heap.seek(cell); - } - - /** - * Move forward on the sub-lists set previously by seek. Assumed forward scanning. - * - * @param cell seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public boolean reseek(Cell cell) throws IOException { - /* - * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - * This code is executed concurrently with flush and puts, without locks. - * Two points must be known when working on this code: - * 1) It's not possible to use the 'kvTail' and 'snapshot' - * variables, as they are modified during a flush. - * 2) The ideal implementation for performance would use the sub skip list - * implicitly pointed by the iterators 'kvsetIt' and - * 'snapshotIt'. Unfortunately the Java API does not offer a method to - * get it. So we remember the last keys we iterated to and restore - * the reseeked set to at least that point. - * - * TODO: The above comment copied from the original MemStoreScanner - */ - if (closed) { - return false; - } - initForwardKVHeapIfNeeded(comparator, scanners); - return heap.reseek(cell); - } - - /** - * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all - * scanners. - * @see KeyValueScanner#getScannerOrder() - */ - @Override - public long getScannerOrder() { - return Long.MAX_VALUE; - } - - @Override - public void close() { - if (closed) { - return; - } - // Ensuring that all the segment scanners are closed - if (heap != null) { - heap.close(); - // It is safe to do close as no new calls will be made to this scanner. - heap = null; - } else { - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - } - closed = true; - } - - /** - * Set the scanner at the seek key. Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean backwardSeek(Cell cell) throws IOException { - // The first time when this happens it sets the scanners to the seek key - // passed by the incoming scan's start row - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - return heap.backwardSeek(cell); - } - - /** - * Assumed backward scanning. - * - * @param cell seek value - * @return false if the key is null or if there is no data - */ - @Override - public boolean seekToPreviousRow(Cell cell) throws IOException { - if (closed) { - return false; - } - initReverseKVHeapIfNeeded(cell, comparator, scanners); - if (heap.peek() == null) { - restartBackwardHeap(cell); - } - return heap.seekToPreviousRow(cell); - } - - @Override - public boolean seekToLastRow() throws IOException { - if (closed) { - return false; - } - return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners); - } - - /** - * Check if this memstore may contain the required keys - * @return False if the key definitely does not exist in this Memstore - */ - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - // TODO : Check if this can be removed. - if (inmemoryCompaction) { - return true; - } - - for (KeyValueScanner sc : scanners) { - if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { - return true; - } - } - return false; - } - - // debug method - @Override - public String toString() { - StringBuffer buf = new StringBuffer(); - int i = 1; - for (KeyValueScanner scanner : scanners) { - buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); - i++; - } - return buf.toString(); - } - /****************** Private methods ******************/ - /** - * Restructure the ended backward heap after rerunning a seekToPreviousRow() - * on each scanner - * @return false if given Cell does not exist in any scanner - */ - private boolean restartBackwardHeap(Cell cell) throws IOException { - boolean res = false; - for (KeyValueScanner scan : scanners) { - res |= scan.seekToPreviousRow(cell); - } - this.heap = - new ReversedKeyValueHeap(scanners, comparator); - return res; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java index 7728534..048f746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSegmentsIterator.java @@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import java.io.IOException; -import java.util.*; +import java.util.Iterator; /** * The MemStoreSegmentsIterator is designed to perform one iteration over given list of segments @@ -35,29 +34,11 @@ import java.util.*; @InterfaceAudience.Private public abstract class MemStoreSegmentsIterator implements Iterator<Cell> { - // scanner for full or partial pipeline (heap of segment scanners) - // we need to keep those scanners in order to close them at the end - protected KeyValueScanner scanner; - protected final ScannerContext scannerContext; - // C-tor - public MemStoreSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator, - int compactionKVMax, Store store) throws IOException { - + public MemStoreSegmentsIterator(int compactionKVMax) throws IOException { this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - - // list of Scanners of segments in the pipeline, when compaction starts - List<KeyValueScanner> scanners = new ArrayList<>(); - - // create the list of scanners to traverse over all the data - // no dirty reads here as these are immutable segments - for (ImmutableSegment segment : segments) { - scanners.add(segment.getScanner(Integer.MAX_VALUE)); - } - - scanner = new MemStoreScanner(comparator, scanners, true); } public abstract void close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index 3858b1c..dd7f957 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.List; /** * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier, * count of cells in it and total memory size occupied by all the cells, timestamp information of @@ -31,7 +32,7 @@ public class MemStoreSnapshot { private final long dataSize; private final long heapSize; private final TimeRangeTracker timeRangeTracker; - private final KeyValueScanner scanner; + private final List<KeyValueScanner> scanners; private final boolean tagsPresent; public MemStoreSnapshot(long id, ImmutableSegment snapshot) { @@ -40,7 +41,7 @@ public class MemStoreSnapshot { this.dataSize = snapshot.keySize(); this.heapSize = snapshot.heapSize(); this.timeRangeTracker = snapshot.getTimeRangeTracker(); - this.scanner = snapshot.getSnapshotScanner(); + this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE); this.tagsPresent = snapshot.isTagsPresent(); } @@ -66,21 +67,21 @@ public class MemStoreSnapshot { } public long getHeapSize() { - return this.heapSize; + return heapSize; } /** * @return {@link TimeRangeTracker} for all the Cells in the snapshot. */ public TimeRangeTracker getTimeRangeTracker() { - return this.timeRangeTracker; + return timeRangeTracker; } /** * @return {@link KeyValueScanner} for iterating over the snapshot */ - public KeyValueScanner getScanner() { - return this.scanner; + public List<KeyValueScanner> getScanners() { + return scanners; } /** @@ -89,4 +90,4 @@ public class MemStoreSnapshot { public boolean isTagsPresent() { return this.tagsPresent; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 925e349..64823b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -629,17 +629,16 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preFlushScannerOpen(ObserverContext, - * Store, KeyValueScanner, InternalScanner, long)} + * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ public InternalScanner preFlushScannerOpen(final Store store, - final KeyValueScanner memstoreScanner, final long readPoint) throws IOException { + final List<KeyValueScanner> scanners, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint)); + setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index 452cca8..6f431c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -18,7 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.SortedSet; @@ -102,7 +102,7 @@ public abstract class Segment { * Creates the scanner for the given read point * @return a scanner for the given read point */ - public KeyValueScanner getScanner(long readPoint) { + protected KeyValueScanner getScanner(long readPoint) { return new SegmentScanner(this, readPoint); } @@ -115,9 +115,7 @@ public abstract class Segment { } public List<KeyValueScanner> getScanners(long readPoint, long order) { - List<KeyValueScanner> scanners = new ArrayList<>(1); - scanners.add(getScanner(readPoint, order)); - return scanners; + return Collections.singletonList(new SegmentScanner(this, readPoint, order)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 5e2e36f..2727360 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.SortedSet; +import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -280,16 +281,11 @@ public class SegmentScanner implements KeyValueScanner { public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { return getSegment().shouldSeek(scan,oldestUnexpiredTS); } - /** - * This scanner is working solely on the in-memory MemStore therefore this - * interface is not relevant. - */ + @Override public boolean requestSeek(Cell c, boolean forward, boolean useBloom) throws IOException { - - throw new IllegalStateException( - "requestSeek cannot be called on MutableCellSetSegmentScanner"); + return NonLazyKeyValueScanner.doRealSeek(this, c, forward); } /** @@ -309,8 +305,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public void enforceSeek() throws IOException { - throw new IllegalStateException( - "enforceSeek cannot be called on MutableCellSetSegmentScanner"); + throw new NotImplementedException("enforceSeek cannot be called on a SegmentScanner"); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java deleted file mode 100644 index 6300e00..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotScanner.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; - -/** - * Scans the snapshot. Acts as a simple scanner that just iterates over all the cells - * in the segment - */ -@InterfaceAudience.Private -public class SnapshotScanner extends SegmentScanner { - - public SnapshotScanner(Segment immutableSegment) { - // Snapshot scanner does not need readpoint. It should read all the cells in the - // segment - super(immutableSegment, Long.MAX_VALUE); - } - - @Override - public Cell peek() { // sanity check, the current should be always valid - if (closed) { - return null; - } - return current; - } - - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; - } - - @Override - public boolean backwardSeek(Cell key) throws IOException { - throw new NotImplementedException( - "backwardSeek must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToPreviousRow(Cell key) throws IOException { - throw new NotImplementedException( - "seekToPreviousRow must not be called on a " + "non-reversed scanner"); - } - - @Override - public boolean seekToLastRow() throws IOException { - throw new NotImplementedException( - "seekToLastRow must not be called on a " + "non-reversed scanner"); - } - - @Override - protected Iterator<Cell> getIterator(Cell cell) { - return segment.iterator(); - } - - @Override - protected void updateCurrent() { - if (iter.hasNext()) { - current = iter.next(); - } else { - current = null; - } - } - - @Override - public boolean seek(Cell seekCell) { - // restart iterator - iter = getIterator(seekCell); - return reseek(seekCell); - } - - @Override - public boolean reseek(Cell seekCell) { - while (iter.hasNext()) { - Cell next = iter.next(); - int ret = segment.getComparator().compare(next, seekCell); - if (ret >= 0) { - current = next; - return true; - } - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 23fae6a..298f3d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -74,22 +74,22 @@ abstract class StoreFlusher { /** * Creates the scanner for flushing snapshot. Also calls coprocessors. - * @param snapshotScanner + * @param snapshotScanners * @param smallestReadPoint * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner createScanner(KeyValueScanner snapshotScanner, + protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, long smallestReadPoint) throws IOException { InternalScanner scanner = null; if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner, + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners, smallestReadPoint); } if (scanner == null) { Scan scan = new Scan(); scan.setMaxVersions(store.getScanInfo().getMaxVersions()); scanner = new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES, + snapshotScanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); } assert scanner != null; http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 85bae9d..3f9688d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -62,7 +62,7 @@ public class StripeStoreFlusher extends StoreFlusher { if (cellsCount == 0) return result; // don't flush if there are no entries long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index ec4601c..24b5051 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -187,7 +187,7 @@ public class SimpleRegionObserver implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { ctPreFlushScannerOpen.incrementAndGet(); return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index ce36af8..80d0e3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; @@ -122,11 +120,11 @@ public class TestRegionObserverScannerOpenHook { public static class NoDataFromFlush implements RegionObserver { @Override public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { Scan scan = new Scan(); scan.setFilter(new NoDataFilter()); return new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 2d096fa..c47ed68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -43,13 +43,13 @@ public class NoOpScanPolicyObserver implements RegionObserver { */ @Override public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { ScanInfo oldSI = store.getScanInfo(); ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(oldSI.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 09ddd6f..a888c45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -384,8 +384,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -426,8 +428,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { List<KeyValueScanner> scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -455,8 +459,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -524,8 +530,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // Creating another snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); snapshot = memstore.snapshot(); @@ -540,8 +548,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index a9f8a97..5a48455 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -316,13 +316,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE); // seek - scanners.get(0).seek(KeyValue.LOWESTKEY); int count = 0; - while (scanners.get(0).next() != null) { - count++; + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).seek(KeyValue.LOWESTKEY); + while (scanners.get(i).next() != null) { + count++; + } } assertEquals("the count should be ", count, 150); - scanners.get(0).close(); + for(int i = 0; i < scanners.size(); i++) { + scanners.get(i).close(); + } } @Test @@ -337,7 +341,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // Just doing the cnt operation here MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator( ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(), - CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore()); + CellComparator.COMPARATOR, 10); int cnt = 0; try { while (itr.next() != null) { @@ -398,8 +402,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore List<KeyValueScanner> scanners = memstore.getScanners(0); // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -427,8 +433,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } @@ -458,8 +466,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore memstore.add(new KeyValue(row, fam, qf4, val), null); memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); - // close the scanner - snapshot.getScanner().close(); + // close the scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index e76da5a..7434eb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -264,12 +264,20 @@ public class TestDefaultMemStore { protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) throws IOException { List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); - assertEquals(1, memstorescanners.size()); - final KeyValueScanner scanner = memstorescanners.get(0); - scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); - assertEquals(kv1, scanner.next()); - assertEquals(kv2, scanner.next()); - assertNull(scanner.next()); + assertEquals(2, memstorescanners.size()); + final KeyValueScanner scanner0 = memstorescanners.get(0); + final KeyValueScanner scanner1 = memstorescanners.get(1); + scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); + Cell n0 = scanner0.next(); + Cell n1 = scanner1.next(); + assertTrue(kv1.equals(n0) || kv1.equals(n1)); + assertTrue(kv2.equals(n0) + || kv2.equals(n1) + || kv2.equals(scanner0.next()) + || kv2.equals(scanner1.next())); + assertNull(scanner0.next()); + assertNull(scanner1.next()); } protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 42aad5c..37a7664 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -138,7 +138,9 @@ public class TestMemStoreChunkPool { memstore.add(new KeyValue(row, fam, qf5, val), null); assertEquals(2, memstore.getActive().getCellsCount()); // close the scanner - this is how the snapshot will be used - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -182,7 +184,9 @@ public class TestMemStoreChunkPool { // Shouldn't putting back the chunks to pool,since some scanners are opening // based on their data // close the snapshot scanner - snapshot.getScanner().close(); + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() == 0); @@ -209,8 +213,10 @@ public class TestMemStoreChunkPool { } // Since no opening scanner, the chunks of snapshot should be put back to // pool - // close the snapshot scanner - snapshot.getScanner().close(); + // close the snapshot scanners + for(KeyValueScanner scanner : snapshot.getScanners()) { + scanner.close(); + } memstore.clearSnapshot(snapshot.getId()); assertTrue(chunkPool.getPoolSize() > 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 69965ba..ecb808e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -120,7 +120,7 @@ public class TestReversibleScanners { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( Collections.singletonList(sf), false, true, false, false, readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -135,7 +135,7 @@ public class TestReversibleScanners { for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = memstore.getScanners(readPoint); - seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); + seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint); } } @@ -560,38 +560,68 @@ public class TestReversibleScanners { } private void seekTestOfReversibleKeyValueScannerWithMVCC( - KeyValueScanner scanner, int readPoint) throws IOException { - /** - * Test with MVCC - */ - // Test seek to last row - KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( - ROWSIZE - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToLastRow()); - assertEquals(expectedKey, scanner.peek()); + List<? extends KeyValueScanner> scanners, int readPoint) throws IOException { + /** + * Test with MVCC + */ + // Test seek to last row + KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan( + ROWSIZE - 1, 0, readPoint); + boolean res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToLastRow(); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test backward seek in two cases // Case1: seek in the same row in backwardSeek expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2, QUALSIZE - 2, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Case2: seek to the previous row in backwardSeek int seekRowNum = ROWSIZE - 3; KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]); expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.backwardSeek(seekKey)); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.backwardSeek(expectedKey); + } + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); // Test seek to previous row seekRowNum = ROWSIZE - 4; expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0, readPoint); - assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil - .createFirstOnRow(ROWS[seekRowNum]))); - assertEquals(expectedKey, scanner.peek()); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum])); + } + assertEquals(expectedKey != null, res); + res = false; + for (KeyValueScanner scanner : scanners) { + res |= (expectedKey.equals(scanner.peek())); + } + assertTrue(res); } private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum, http://git-wip-us.apache.org/repos/asf/hbase/blob/8f4ae0a0/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index caf8de9..27e93a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -238,7 +237,7 @@ public class TestCoprocessorScanPolicy { @Override public InternalScanner preFlushScannerOpen( final ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { + Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { Long newTtl = ttls.get(store.getTableName()); if (newTtl != null) { System.out.println("PreFlush:" + newTtl); @@ -253,7 +252,7 @@ public class TestCoprocessorScanPolicy { oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); - return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + return new StoreScanner(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); }