http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index af4271a..9590260 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; @@ -87,6 +88,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagRewriteCell; @@ -2432,39 +2434,44 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public RegionScanner getScanner(Scan scan) throws IOException { - return getScanner(scan, null); + return getScanner(scan, true); } - protected RegionScanner getScanner(Scan scan, - List<KeyValueScanner> additionalScanners) throws IOException { + public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException { + RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem); + return scanner; + } + + protected RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners, + boolean copyCellsFromSharedMem) throws IOException { startRegionOperation(Operation.SCAN); try { // Verify families are all valid if (!scan.hasFamilies()) { // Adding all families to scanner - for (byte[] family: this.htableDescriptor.getFamiliesKeys()) { + for (byte[] family : this.htableDescriptor.getFamiliesKeys()) { scan.addFamily(family); } } else { - for (byte [] family : scan.getFamilyMap().keySet()) { + for (byte[] family : scan.getFamilyMap().keySet()) { checkFamily(family); } } - return instantiateRegionScanner(scan, additionalScanners); + return instantiateRegionScanner(scan, additionalScanners, copyCellsFromSharedMem); } finally { closeRegionOperation(Operation.SCAN); } } protected RegionScanner instantiateRegionScanner(Scan scan, - List<KeyValueScanner> additionalScanners) throws IOException { + List<KeyValueScanner> additionalScanners, boolean copyCellsFromSharedMem) throws IOException { if (scan.isReversed()) { if (scan.getFilter() != null) { scan.getFilter().setReversed(true); } - return new ReversedRegionScannerImpl(scan, additionalScanners, this); + return new ReversedRegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem); } - return new RegionScannerImpl(scan, additionalScanners, this); + return new RegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem); } @Override @@ -5210,6 +5217,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected final byte[] stopRow; protected final HRegion region; protected final CellComparator comparator; + protected boolean copyCellsFromSharedMem = false; private final long readPt; private final long maxResultSize; @@ -5221,7 +5229,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return region.getRegionInfo(); } - RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region) + public void setCopyCellsFromSharedMem(boolean copyCells) { + this.copyCellsFromSharedMem = copyCells; + } + + RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region, + boolean copyCellsFromSharedMem) throws IOException { this.region = region; this.maxResultSize = scan.getMaxResultSize(); @@ -5231,13 +5244,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.filter = null; } this.comparator = region.getCellCompartor(); - /** * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default * scanner context that can be used to enforce the batch limit in the event that a * ScannerContext is not specified during an invocation of next/nextRaw */ - defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); + defaultScannerContext = ScannerContext.newBuilder() + .setBatchLimit(scan.getBatch()).build(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; @@ -5279,6 +5292,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi joinedScanners.add(scanner); } } + this.copyCellsFromSharedMem = copyCellsFromSharedMem; initializeKVHeap(scanners, joinedScanners, region); } @@ -5353,24 +5367,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - boolean moreValues; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); - } else { - List<Cell> tmpList = new ArrayList<Cell>(); - moreValues = nextInternal(tmpList, scannerContext); - outResults.addAll(tmpList); - } + boolean moreValues = false; + try { + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + moreValues = nextInternal(outResults, scannerContext); + } else { + List<Cell> tmpList = new ArrayList<Cell>(); + moreValues = nextInternal(tmpList, scannerContext); + outResults.addAll(tmpList); + } - // If the size limit was reached it means a partial Result is being returned. Returning a - // partial Result means that we should not reset the filters; filters should only be reset in - // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + // If the size limit was reached it means a partial Result is being + // returned. Returning a + // partial Result means that we should not reset the filters; filters + // should only be reset in + // between rows + if (!scannerContext.partialResultFormed()) resetFilters(); + + if (isFilterDoneInternal()) { + moreValues = false; + } - if (isFilterDoneInternal()) { - moreValues = false; + // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise + // it is a call coming from the RsRpcServices.scan(). + if (copyCellsFromSharedMem && !outResults.isEmpty()) { + // Do the copy of the results here. + ListIterator<Cell> listItr = outResults.listIterator(); + Cell cell = null; + while (listItr.hasNext()) { + cell = listItr.next(); + if (cell instanceof ShareableMemory) { + listItr.set(((ShareableMemory) cell).cloneToCell()); + } + } + } + } finally { + if (copyCellsFromSharedMem) { + // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return + // the blocks then and there (for wrapped CPs) + this.shipped(); + } } return moreValues; } @@ -6365,6 +6403,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Result get(final Get get) throws IOException { + prepareGet(get); + List<Cell> results = get(get, true); + boolean stale = this.getRegionInfo().getReplicaId() != 0; + return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + + void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException { checkRow(get.getRow(), "Get"); // Verify families are all valid if (get.hasFamilies()) { @@ -6376,9 +6421,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi get.addFamily(family); } } - List<Cell> results = get(get, true); - boolean stale = this.getRegionInfo().getReplicaId() != 0; - return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } @Override @@ -6388,9 +6430,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // pre-get CP hook if (withCoprocessor && (coprocessorHost != null)) { - if (coprocessorHost.preGet(get, results)) { - return results; - } + if (coprocessorHost.preGet(get, results)) { + return results; + } } Scan scan = new Scan(get); @@ -6409,16 +6451,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postGet(get, results); } - // do after lock + metricsUpdateForGet(results); + + return results; + } + + void metricsUpdateForGet(List<Cell> results) { if (this.metricsRegion != null) { long totalSize = 0L; for (Cell cell : results) { + // This should give an estimate of the cell in the result. Why do we need + // to know the serialization of how the codec works with it?? totalSize += CellUtil.estimatedSerializedSizeOf(cell); } this.metricsRegion.updateGet(totalSize); } - - return results; } @Override @@ -7179,7 +7226,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // New HBASE-880 Helpers // - private void checkFamily(final byte [] family) + void checkFamily(final byte [] family) throws NoSuchColumnFamilyException { if (!this.htableDescriptor.hasFamily(family)) { throw new NoSuchColumnFamilyException("Column family " +
http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 7483568..806eeb5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -47,6 +47,10 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner { protected PriorityQueue<KeyValueScanner> heap = null; + // Holds the scanners when a ever a eager close() happens. All such eagerly closed + // scans are collected and when the final scanner.close() happens will perform the + // actual close. + protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>(); /** * The current sub-scanner, i.e. the one that contains the next key/value @@ -62,8 +66,6 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner protected KVScannerComparator comparator; - protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>(); - /** * Constructor. This KeyValueHeap will handle closing of passed in * KeyValueScanners. @@ -160,6 +162,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner */ if (pee == null || !moreCells) { + // add the scanner that is to be closed this.scannersForDelayedClose.add(this.current); } else { this.heap.add(this.current); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5f31086..9868b49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -155,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; @@ -243,7 +244,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, /** * An Rpc callback for closing a RegionScanner. */ - private static class RegionScannerCloseCallBack implements RpcCallback { + static class RegionScannerCloseCallBack implements RpcCallback { private final RegionScanner scanner; @@ -284,6 +285,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } /** + * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on + * completion of multiGets. + */ + static class RegionScannersCloseCallBack implements RpcCallback { + private final List<RegionScanner> scanners = new ArrayList<RegionScanner>(); + + public void addScanner(RegionScanner scanner) { + this.scanners.add(scanner); + } + + @Override + public void run() { + for (RegionScanner scanner : scanners) { + try { + scanner.close(); + } catch (IOException e) { + LOG.error("Exception while closing the scanner " + scanner, e); + } + } + } + } + + /** * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ private static class RegionScannerHolder { @@ -337,7 +361,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().preScannerClose(s); } - s.close(); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(s); @@ -418,8 +441,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return context != null && context.isClientCellBlockSupport(); } - private void addResult(final MutateResponse.Builder builder, - final Result result, final PayloadCarryingRpcController rpcc) { + private void addResult(final MutateResponse.Builder builder, final Result result, + final PayloadCarryingRpcController rpcc) { if (result == null) return; if (isClientCellBlockSupport()) { builder.setResult(ProtobufUtil.toResultNoData(result)); @@ -626,13 +649,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // ResultOrException instance that matches each Put or Delete is then added down in the // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched List<ClientProtos.Action> mutations = null; - for (ClientProtos.Action action: actions.getActionList()) { + RpcCallContext context = RpcServer.getCurrentCall(); + // An RpcCallBack that creates a list of scanners that needs to perform callBack + // operation on completion of multiGets. + RegionScannersCloseCallBack closeCallBack = null; + for (ClientProtos.Action action : actions.getActionList()) { ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null; try { Result r = null; if (action.hasGet()) { + if (closeCallBack == null) { + // Initialize only once + closeCallBack = new RegionScannersCloseCallBack(); + // Set the call back here itself. + context.setCallBack(closeCallBack); + } Get get = ProtobufUtil.toGet(action.getGet()); - r = region.get(get); + r = get(get, ((HRegion) region), closeCallBack, context); } else if (action.hasServiceCall()) { resultOrExceptionBuilder = ResultOrException.newBuilder(); try { @@ -661,7 +694,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, r = append(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case INCREMENT: - r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); + r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup); break; case PUT: case DELETE: @@ -679,7 +712,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } if (r != null) { ClientProtos.Result pbResult = null; - if (isClientCellBlockSupport()) { + if (isClientCellBlockSupport(context)) { pbResult = ProtobufUtil.toResultNoData(r); // Hard to guess the size here. Just make a rough guess. if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>(); @@ -1930,7 +1963,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ClientProtos.Get get = request.getGet(); Boolean existence = null; Result r = null; - + RpcCallContext context = RpcServer.getCurrentCall(); quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); Get clientGet = ProtobufUtil.toGet(get); @@ -1938,7 +1971,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, existence = region.getCoprocessorHost().preExists(clientGet); } if (existence == null) { - r = region.get(clientGet); + if (context != null) { + r = get(clientGet, ((HRegion) region), null, context); + } else { + // for test purpose + r = region.get(clientGet); + } if (get.getExistenceOnly()) { boolean exists = r.getExists(); if (region.getCoprocessorHost() != null) { @@ -1971,6 +2009,52 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, + RpcCallContext context) throws IOException { + region.prepareGet(get); + List<Cell> results = new ArrayList<Cell>(); + boolean stale = region.getRegionInfo().getReplicaId() != 0; + // pre-get CP hook + if (region.getCoprocessorHost() != null) { + if (region.getCoprocessorHost().preGet(get, results)) { + return Result + .create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + } + + Scan scan = new Scan(get); + + RegionScanner scanner = null; + try { + scanner = region.getScanner(scan, false); + scanner.next(results); + } finally { + if (scanner != null) { + if (closeCallBack == null) { + // If there is a context then the scanner can be added to the current + // RpcCallContext. The rpc callback will take care of closing the + // scanner, for eg in case + // of get() + assert scanner instanceof org.apache.hadoop.hbase.ipc.RpcCallback; + context.setCallBack((RegionScannerImpl) scanner); + } else { + // The call is from multi() where the results from the get() are + // aggregated and then send out to the + // rpc. The rpccall back will close all such scanners created as part + // of multi(). + closeCallBack.addScanner(scanner); + } + } + } + + // post-get CP hook + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postGet(get, results); + } + region.metricsUpdateForGet(results); + return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor * @@ -2230,6 +2314,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean moreResults = true; boolean closeScanner = false; boolean isSmallScan = false; + RegionScanner actualRegionScanner = null; ScanResponse.Builder builder = ScanResponse.newBuilder(); if (request.hasCloseScanner()) { closeScanner = request.getCloseScanner(); @@ -2274,17 +2359,27 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scanner = region.getCoprocessorHost().preScannerOpen(scan); } if (scanner == null) { - scanner = region.getScanner(scan); + scanner = ((HRegion)region).getScanner(scan, false); } + actualRegionScanner = scanner; if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } + if (actualRegionScanner != scanner) { + // It means the RegionScanner has been wrapped + if (actualRegionScanner instanceof RegionScannerImpl) { + // Copy the results when nextRaw is called from the CP so that + // CP can have a cloned version of the results without bothering + // about the eviction. Ugly, yes!!! + ((RegionScannerImpl) actualRegionScanner).setCopyCellsFromSharedMem(true); + } + } scannerId = this.scannerIdGen.incrementAndGet(); scannerName = String.valueOf(scannerId); rsh = addScanner(scannerName, scanner, region); ttl = this.scannerLeaseTimeoutPeriod; } - + assert scanner != null; RpcCallContext context = RpcServer.getCurrentCall(); quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); @@ -2295,9 +2390,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // performed even before checking of Lease. // See HBASE-5974 if (request.hasNextCallSeq()) { - if (rsh == null) { - rsh = scanners.get(scannerName); - } if (rsh != null) { if (request.getNextCallSeq() != rsh.getNextCallSeq()) { throw new OutOfOrderScannerNextException( @@ -2411,7 +2503,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); - boolean limitReached = false; while (i < rows) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The @@ -2488,7 +2579,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } finally { region.closeRegionOperation(); } - // coprocessor postNext hook if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index 85b7b83..ca09cdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -42,9 +42,9 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { * @throws IOException */ ReversedRegionScannerImpl(Scan scan, - List<KeyValueScanner> additionalScanners, HRegion region) + List<KeyValueScanner> additionalScanners, HRegion region, boolean copyCellsFromSharedMem) throws IOException { - region.super(scan, additionalScanners, region); + region.super(scan, additionalScanners, region, copyCellsFromSharedMem); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index daea870..5ea67ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -1282,7 +1283,7 @@ public class StoreFile { // Empty file if (reader.getTrailer().getEntryCount() == 0) return false; - + HFileBlock bloomBlock = null; try { boolean shouldCheckBloom; ByteBuff bloom; @@ -1290,8 +1291,8 @@ public class StoreFile { bloom = null; shouldCheckBloom = true; } else { - bloom = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, - true); + bloomBlock = reader.getMetaBlock(HFile.BLOOM_FILTER_DATA_KEY, true); + bloom = bloomBlock.getBufferWithoutHeader(); shouldCheckBloom = bloom != null; } @@ -1343,8 +1344,10 @@ public class StoreFile { } catch (IllegalArgumentException e) { LOG.error("Bad bloom filter data -- proceeding without", e); setGeneralBloomFilterFaulty(); + } finally { + // Return the bloom block so that its ref count can be decremented. + reader.returnBlock(bloomBlock); } - return true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 37573c2..38f36ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -85,6 +85,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long maxRowSize; protected final long cellsPerHeartbeatCheck; + // Collects all the KVHeap that are eagerly getting closed during the + // course of a scan protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>(); /** @@ -446,8 +448,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private void close(boolean withHeapClose){ lock.lock(); try { - if (this.closing) return; - this.closing = true; + if (this.closing) { + return; + } + if (withHeapClose) this.closing = true; // under test, we dont have a this.store if (this.store != null) this.store.deleteChangedReaderObserver(this); if (withHeapClose) { @@ -509,6 +513,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // if the heap was left null, then the scanners had previously run out anyways, close and // return. if (this.heap == null) { + // By this time partial close should happened because already heap is null close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 86b6c35..a3a1e61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1406,7 +1406,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { desc.addFamily(hcd); } getHBaseAdmin().createTable(desc, startKey, endKey, numRegions); - // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned + // HBaseAdmin only waits for regions to appear in hbase:meta we + // should wait until they are assigned waitUntilAllRegionsAssigned(tableName); return (HTable) getConnection().getTable(tableName); } @@ -1444,8 +1445,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { htd.addFamily(hcd); } getHBaseAdmin().createTable(htd, splitKeys); - // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are - // assigned + // HBaseAdmin only waits for regions to appear in hbase:meta + // we should wait until they are assigned waitUntilAllRegionsAssigned(htd.getTableName()); return (HTable) getConnection().getTable(htd.getTableName()); } @@ -1460,7 +1461,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public HTable createTable(HTableDescriptor htd, byte[][] splitRows) throws IOException { getHBaseAdmin().createTable(htd, splitRows); - // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned + // HBaseAdmin only waits for regions to appear in hbase:meta + // we should wait until they are assigned waitUntilAllRegionsAssigned(htd.getTableName()); return (HTable) getConnection().getTable(htd.getTableName()); } @@ -1700,6 +1702,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return (HTable) getConnection().getTable(tableName); } + public HTable createTable(TableName tableName, byte[][] families, + int numVersions, int blockSize, String cpName) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family) + .setMaxVersions(numVersions) + .setBlocksize(blockSize); + desc.addFamily(hcd); + } + if(cpName != null) { + desc.addCoprocessor(cpName); + } + getHBaseAdmin().createTable(desc); + // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned + waitUntilAllRegionsAssigned(tableName); + return (HTable) getConnection().getTable(tableName); + } + /** * Create a table. * @param tableName http://git-wip-us.apache.org/repos/asf/hbase/blob/ccb22bd8/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java new file mode 100644 index 0000000..a3cd8d0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -0,0 +1,1441 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +@SuppressWarnings("deprecation") +public class TestBlockEvictionFromClient { + private static final Log LOG = LogFactory.getLog(TestBlockEvictionFromClient.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static byte[][] ROWS = new byte[2][]; + private static int NO_OF_THREADS = 3; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[] ROW1 = Bytes.toBytes("testRow1"); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES_1 = new byte[1][0]; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + private static byte[] data = new byte[1000]; + private static byte[] data2 = Bytes.add(data, data); + protected static int SLAVES = 1; + private static CountDownLatch latch; + private static CountDownLatch getLatch; + private static CountDownLatch compactionLatch; + private static CountDownLatch exceptionLatch; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ROWS[0] = ROW; + ROWS[1] = ROW1; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below + // tests + conf.setInt("hbase.regionserver.handler.count", 20); + conf.setInt("hbase.bucketcache.size", 400); + conf.setStrings("hbase.bucketcache.ioengine", "heap"); + conf.setFloat("hfile.block.cache.size", 0.2f); + conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); + FAMILIES_1[0] = FAMILY; + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + CustomInnerRegionObserver.waitForGets.set(false); + CustomInnerRegionObserver.countOfNext.set(0); + CustomInnerRegionObserver.countOfGets.set(0); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + if (latch != null) { + while (latch.getCount() > 0) { + latch.countDown(); + } + } + if (getLatch != null) { + getLatch.countDown(); + } + if (compactionLatch != null) { + compactionLatch.countDown(); + } + if (exceptionLatch != null) { + exceptionLatch.countDown(); + } + latch = null; + getLatch = null; + compactionLatch = null; + exceptionLatch = null; + CustomInnerRegionObserver.throwException.set(false); + // Clean up the tables for every test case + TableName[] listTableNames = TEST_UTIL.getHBaseAdmin().listTableNames(); + for (TableName tableName : listTableNames) { + if (!tableName.isSystemTable()) { + TEST_UTIL.getHBaseAdmin().disableTable(tableName); + TEST_UTIL.getHBaseAdmin().deleteTable(tableName); + } + } + } + + @Test + public void testBlockEvictionWithParallelScans() throws Exception { + HTable table = null; + try { + latch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testBlockEvictionWithParallelScans"); + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // data was in memstore so don't expect any changes + // flush the data + System.out.println("Flushing cache in problematic area"); + // Should create one Hfile with 2 blocks + region.flush(true); + // Load cache + // Create three sets of scan + ScanThread[] scanThreads = initiateScan(table, false); + Thread.sleep(100); + checkForBlockEviction(cache, false, false, false); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // CustomInnerRegionObserver.sleepTime.set(0); + Iterator<CachedBlock> iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // read the data and expect same blocks, one new hit, no misses + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // Check how this miss is happening + // insert a second column, read the row, no new blocks, 3 new hits + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte[] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // compact, net minus two blocks, two hits, no misses + System.out.println("Compacting"); + assertEquals(2, store.getStorefilesCount()); + store.triggerMajorCompaction(); + region.compact(true); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getStorefilesCount()); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // read the row, this should be a cache miss because we don't cache data + // blocks on compaction + r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testParallelGetsAndScans() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testParallelGetsAndScans"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // Create three sets of scan + CustomInnerRegionObserver.waitForGets.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + checkForBlockEviction(cache, false, false, false); + CustomInnerRegionObserver.waitForGets.set(false); + checkForBlockEviction(cache, false, false, false); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + System.out.println("Scans should have returned the bloks"); + // Check with either true or false + CustomInnerRegionObserver.waitForGets.set(false); + // The scan should also have released the blocks by now + checkForBlockEviction(cache, true, true, false); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetWithCellsInDifferentFiles"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + Thread.sleep(200); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + // TODO : check how block index works here + public void testGetsWithMultiColumnsAndExplicitTracker() + throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetsWithMultiColumnsAndExplicitTracker"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(FAMILY, Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, true, false); + Thread.sleep(200); + Iterator<CachedBlock> iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(10, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testGetWithMultipleColumnFamilies"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + byte[][] fams = new byte[10][]; + fams[0] = FAMILY; + for (int i = 1; i < 10; i++) { + fams[i] = (Bytes.toBytes("testFamily" + i)); + } + table = TEST_UTIL.createTable(tableName, fams, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, true, true); + Thread.sleep(200); + Iterator<CachedBlock> iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(3, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testMultiGets() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testMultiGets"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + CustomInnerRegionObserver.waitForGets.set(true); + // Create three sets of gets + MultiGetThread[] getThreads = initiateMultiGet(table); + Thread.sleep(200); + int refCount; + Iterator<CachedBlock> iterator = cache.iterator(); + boolean foundNonZeroBlock = false; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + assertEquals(NO_OF_THREADS, refCount); + foundNonZeroBlock = true; + } + } + assertTrue("Should have found nonzero ref count block",foundNonZeroBlock); + CustomInnerRegionObserver.getCdl().get().countDown(); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (MultiGetThread thread : getThreads) { + thread.join(); + } + // Verify whether the gets have returned the blocks that it had + CustomInnerRegionObserver.waitForGets.set(true); + // giving some time for the block to be decremented + iterateBlockCache(cache, iterator); + getLatch.countDown(); + System.out.println("Gets should have returned the bloks"); + } finally { + if (table != null) { + table.close(); + } + } + } + @Test + public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + // Check if get() returns blocks on its close() itself + TableName tableName = TableName.valueOf("testScanWithMultipleColumnFamilies"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + byte[][] fams = new byte[10][]; + fams[0] = FAMILY; + for (int i = 1; i < 10; i++) { + fams[i] = (Bytes.toBytes("testFamily" + i)); + } + table = TEST_UTIL.createTable(tableName, fams, 1, 1024, + CustomInnerRegionObserver.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + BlockCache cache = setCacheProperties(region); + + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + region.flush(true); + for (int i = 1; i < 10; i++) { + put = new Put(ROW); + put.add(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2); + table.put(put); + if (i % 2 == 0) { + region.flush(true); + } + } + region.flush(true); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + region.flush(true); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + // Create three sets of gets + ScanThread[] scanThreads = initiateScan(table, true); + Thread.sleep(200); + Iterator<CachedBlock> iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + int noOfBlocksWithRef = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + System.out.println("The refCount is " + refCount); + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + noOfBlocksWithRef++; + } + } + assertTrue(usedBlocksFound); + // the number of blocks referred + assertEquals(12, noOfBlocksWithRef); + CustomInnerRegionObserver.getCdl().get().countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // giving some time for the block to be decremented + checkForBlockEviction(cache, true, false, false); + } finally { + if (table != null) { + table.close(); + } + } + } + + private BlockCache setCacheProperties(Region region) { + Iterator<Store> strItr = region.getStores().iterator(); + BlockCache cache = null; + while (strItr.hasNext()) { + Store store = strItr.next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + // Use the last one + cache = cacheConf.getBlockCache(); + } + return cache; + } + + @Test + public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException, + InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(2); + // Check if get() returns blocks on its close() itself + getLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testParallelGetsAndScanWithWrappedRegionScanner"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // CustomInnerRegionObserver.sleepTime.set(5000); + // Create three sets of scan + CustomInnerRegionObserver.waitForGets.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // Create three sets of gets + GetThread[] getThreads = initiateGet(table, false, false); + // The block would have been decremented for the scan case as it was + // wrapped + // before even the postNext hook gets executed. + // giving some time for the block to be decremented + Thread.sleep(100); + CustomInnerRegionObserver.waitForGets.set(false); + checkForBlockEviction(cache, false, false, true); + // countdown the latch + CustomInnerRegionObserver.getCdl().get().countDown(); + for (GetThread thread : getThreads) { + thread.join(); + } + getLatch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testScanWithCompaction() throws IOException, InterruptedException { + testScanWithCompactionInternals("testScanWithCompaction", false); + } + + @Test + public void testReverseScanWithCompaction() throws IOException, InterruptedException { + testScanWithCompactionInternals("testReverseScanWithCompaction", true); + } + + private void testScanWithCompactionInternals(String tableNameStr, boolean reversed) + throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + compactionLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf(tableNameStr); + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // insert data. 2 Rows are added + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // Should create one Hfile with 2 blocks + region.flush(true); + // read the data and expect same blocks, one new hit, no misses + int refCount = 0; + // Check how this miss is happening + // insert a second column, read the row, no new blocks, 3 new hits + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte[] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + // flush, one new block + System.out.println("Flushing cache"); + region.flush(true); + Iterator<CachedBlock> iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + // Create three sets of scan + ScanThread[] scanThreads = initiateScan(table, reversed); + Thread.sleep(100); + iterator = cache.iterator(); + boolean usedBlocksFound = false; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + usedBlocksFound = false; + System.out.println("Compacting"); + assertEquals(2, store.getStorefilesCount()); + store.triggerMajorCompaction(); + region.compact(true); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getStorefilesCount()); + // Even after compaction is done we will have some blocks that cannot + // be evicted this is because the scan is still referencing them + iterator = cache.iterator(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 as they are not yet cleared + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound); + // Should not throw exception + compactionLatch.countDown(); + latch.countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + // by this time all blocks should have been evicted + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + // The gets would be working on new blocks + iterator = cache.iterator(); + iterateBlockCache(cache, iterator); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testScanWithException() throws IOException, InterruptedException { + HTable table = null; + try { + latch = new CountDownLatch(1); + exceptionLatch = new CountDownLatch(1); + TableName tableName = TableName.valueOf("testScanWithException"); + // Create KV that will give you two blocks + // Create a table with block size as 1024 + table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CustomInnerRegionObserverWrapper.class.getName()); + // get the block cache and region + RegionLocator locator = table.getRegionLocator(); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions( + regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + // insert data. 2 Rows are added + insertData(table); + // flush the data + System.out.println("Flushing cache"); + // Should create one Hfile with 2 blocks + region.flush(true); + // CustomInnerRegionObserver.sleepTime.set(5000); + CustomInnerRegionObserver.throwException.set(true); + ScanThread[] scanThreads = initiateScan(table, false); + // The block would have been decremented for the scan case as it was + // wrapped + // before even the postNext hook gets executed. + // giving some time for the block to be decremented + Thread.sleep(100); + Iterator<CachedBlock> iterator = cache.iterator(); + boolean usedBlocksFound = false; + int refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue(usedBlocksFound); + exceptionLatch.countDown(); + // countdown the latch + CustomInnerRegionObserver.getCdl().get().countDown(); + for (ScanThread thread : scanThreads) { + thread.join(); + } + iterator = cache.iterator(); + usedBlocksFound = false; + refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + if (refCount != 0) { + // Blocks will be with count 3 + assertEquals(NO_OF_THREADS, refCount); + usedBlocksFound = true; + } + } + assertTrue(usedBlocksFound); + // Sleep till the scan lease would expire? Can we reduce this value? + Thread.sleep(5100); + iterator = cache.iterator(); + refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + assertEquals(0, refCount); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) { + int refCount; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + assertEquals(0, refCount); + } + } + + private void insertData(HTable table) throws IOException { + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + } + + private ScanThread[] initiateScan(HTable table, boolean reverse) throws IOException, + InterruptedException { + ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + scanThreads[i] = new ScanThread(table, reverse); + } + for (ScanThread thread : scanThreads) { + thread.start(); + } + return scanThreads; + } + + private GetThread[] initiateGet(HTable table, boolean tracker, boolean multipleCFs) + throws IOException, InterruptedException { + GetThread[] getThreads = new GetThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + getThreads[i] = new GetThread(table, tracker, multipleCFs); + } + for (GetThread thread : getThreads) { + thread.start(); + } + return getThreads; + } + + private MultiGetThread[] initiateMultiGet(HTable table) + throws IOException, InterruptedException { + MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS]; + for (int i = 0; i < NO_OF_THREADS; i++) { + multiGetThreads[i] = new MultiGetThread(table); + } + for (MultiGetThread thread : multiGetThreads) { + thread.start(); + } + return multiGetThreads; + } + + private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero, + boolean wrappedCp) throws InterruptedException { + int counter = NO_OF_THREADS; + if (CustomInnerRegionObserver.waitForGets.get()) { + // Because only one row is selected, it has only 2 blocks + counter = counter - 1; + while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) { + Thread.sleep(100); + } + } else { + while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) { + Thread.sleep(100); + } + } + Iterator<CachedBlock> iterator = cache.iterator(); + int refCount = 0; + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + if (cache instanceof BucketCache) { + refCount = ((BucketCache) cache).getRefCount(cacheKey); + } else if (cache instanceof CombinedBlockCache) { + refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey); + } else { + continue; + } + System.out.println(" the refcount is " + refCount + " block is " + cacheKey); + if (CustomInnerRegionObserver.waitForGets.get()) { + if (expectOnlyZero) { + assertTrue(refCount == 0); + } + if (refCount != 0) { + // Because the scan would have also touched up on these blocks but + // it + // would have touched + // all 3 + if (getClosed) { + // If get has closed only the scan's blocks would be available + assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get()); + } else { + assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS)); + } + } + } else { + // Because the get would have also touched up on these blocks but it + // would have touched + // upon only 2 additionally + if (expectOnlyZero) { + assertTrue(refCount == 0); + } + if (refCount != 0) { + if (getLatch == null || wrappedCp) { + assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get()); + } else { + assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS)); + } + } + } + } + CustomInnerRegionObserver.getCdl().get().countDown(); + } + + private static class MultiGetThread extends Thread { + private final HTable table; + private final List<Get> gets = new ArrayList<Get>(); + public MultiGetThread(HTable table) { + this.table = table; + } + @Override + public void run() { + gets.add(new Get(ROW)); + gets.add(new Get(ROW1)); + try { + CustomInnerRegionObserver.getCdl().set(latch); + Result[] r = table.get(gets); + assertTrue(Bytes.equals(r[0].getRow(), ROW)); + assertTrue(Bytes.equals(r[1].getRow(), ROW1)); + } catch (IOException e) { + } + } + } + + private static class GetThread extends Thread { + private final HTable table; + private final boolean tracker; + private final boolean multipleCFs; + + public GetThread(HTable table, boolean tracker, boolean multipleCFs) { + this.table = table; + this.tracker = tracker; + this.multipleCFs = multipleCFs; + } + + @Override + public void run() { + try { + initiateGet(table); + } catch (IOException e) { + // do nothing + } + } + + private void initiateGet(HTable table) throws IOException { + Get get = new Get(ROW); + if (tracker) { + // Change this + if (!multipleCFs) { + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3)); + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8)); + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9)); + // Unknown key + get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900)); + } else { + get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)); + get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)); + get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)); + // Unknown key + get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900)); + } + } + CustomInnerRegionObserver.getCdl().set(latch); + Result r = table.get(get); + System.out.println(r); + if (!tracker) { + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + } else { + if (!multipleCFs) { + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2)); + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2)); + assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2)); + } else { + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)), + data2)); + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)), + data2)); + assertTrue(Bytes.equals( + r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)), + data2)); + } + } + } + } + + private static class ScanThread extends Thread { + private final HTable table; + private final boolean reverse; + + public ScanThread(HTable table, boolean reverse) { + this.table = table; + this.reverse = reverse; + } + + @Override + public void run() { + try { + initiateScan(table); + } catch (IOException e) { + // do nothing + } + } + + private void initiateScan(HTable table) throws IOException { + Scan scan = new Scan(); + if (reverse) { + scan.setReversed(true); + } + CustomInnerRegionObserver.getCdl().set(latch); + ResultScanner resScanner = table.getScanner(scan); + int i = (reverse ? ROWS.length - 1 : 0); + boolean resultFound = false; + for (Result result : resScanner) { + resultFound = true; + System.out.println(result); + if (!reverse) { + assertTrue(Bytes.equals(result.getRow(), ROWS[i])); + i++; + } else { + assertTrue(Bytes.equals(result.getRow(), ROWS[i])); + i--; + } + } + assertTrue(resultFound); + } + } + + private void waitForStoreFileCount(Store store, int count, int timeout) + throws InterruptedException { + long start = System.currentTimeMillis(); + while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) { + Thread.sleep(100); + } + System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" + + store.getStorefilesCount()); + assertEquals(count, store.getStorefilesCount()); + } + + private static class CustomScanner implements RegionScanner { + + private RegionScanner delegate; + + public CustomScanner(RegionScanner delegate) { + this.delegate = delegate; + } + + @Override + public boolean next(List<Cell> results) throws IOException { + return delegate.next(results); + } + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return delegate.next(result, scannerContext); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + return delegate.nextRaw(result); + } + + @Override + public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException { + boolean nextRaw = delegate.nextRaw(result, context); + if (compactionLatch != null && compactionLatch.getCount() > 0) { + try { + compactionLatch.await(); + } catch (InterruptedException ie) { + } + } + + if (CustomInnerRegionObserver.throwException.get()) { + if (exceptionLatch.getCount() > 0) { + try { + exceptionLatch.await(); + } catch (InterruptedException e) { + } + throw new IOException("throw exception"); + } + } + return nextRaw; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public HRegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } + + @Override + public boolean isFilterDone() throws IOException { + return delegate.isFilterDone(); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + return false; + } + + @Override + public long getMaxResultSize() { + return delegate.getMaxResultSize(); + } + + @Override + public long getMvccReadPoint() { + return delegate.getMvccReadPoint(); + } + + @Override + public int getBatch() { + return delegate.getBatch(); + } + + @Override + public void shipped() throws IOException { + this.delegate.shipped(); + } + } + + public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver { + @Override + public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, + Scan scan, RegionScanner s) throws IOException { + return new CustomScanner(s); + } + } + + public static class CustomInnerRegionObserver extends BaseRegionObserver { + static final AtomicLong sleepTime = new AtomicLong(0); + static final AtomicBoolean slowDownNext = new AtomicBoolean(false); + static final AtomicInteger countOfNext = new AtomicInteger(0); + static final AtomicInteger countOfGets = new AtomicInteger(0); + static final AtomicBoolean waitForGets = new AtomicBoolean(false); + static final AtomicBoolean throwException = new AtomicBoolean(false); + private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<CountDownLatch>( + new CountDownLatch(0)); + + @Override + public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, + InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException { + slowdownCode(e, false); + if (getLatch != null && getLatch.getCount() > 0) { + try { + getLatch.await(); + } catch (InterruptedException e1) { + } + } + return super.postScannerNext(e, s, results, limit, hasMore); + } + + @Override + public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, + List<Cell> results) throws IOException { + slowdownCode(e, true); + super.postGetOp(e, get, results); + } + + public static AtomicReference<CountDownLatch> getCdl() { + return cdl; + } + + private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e, + boolean isGet) { + CountDownLatch latch = getCdl().get(); + try { + System.out.println(latch.getCount() + " is the count " + isGet); + if (latch.getCount() > 0) { + if (isGet) { + countOfGets.incrementAndGet(); + } else { + countOfNext.incrementAndGet(); + } + LOG.info("Waiting for the counterCountDownLatch"); + latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. + if (latch.getCount() > 0) { + throw new RuntimeException("Can't wait more"); + } + } + } catch (InterruptedException e1) { + LOG.error(e1); + } + } + } +}
