HBASE-18183 Region interface cleanup for CP expose.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b4ed1300 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b4ed1300 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b4ed1300 Branch: refs/heads/master Commit: b4ed130083a26bad455eea2843a2e3075f400260 Parents: 690ff38 Author: anoopsamjohn <anoopsamj...@gmail.com> Authored: Wed Oct 11 16:47:15 2017 +0530 Committer: anoopsamjohn <anoopsamj...@gmail.com> Committed: Wed Oct 11 16:47:15 2017 +0530 ---------------------------------------------------------------------- .../hbase/shaded/protobuf/ProtobufUtil.java | 4 + .../security/access/SecureBulkLoadEndpoint.java | 3 +- .../coprocessor/example/BulkDeleteEndpoint.java | 3 +- .../ZkSplitLogWorkerCoordination.java | 4 +- .../procedure/flush/FlushTableSubprocedure.java | 12 +- .../RegionServerFlushTableProcedureManager.java | 7 +- .../regionserver/CompactedHFilesDischarger.java | 6 +- .../hbase/regionserver/FlushRequester.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 252 +++++++++++--- .../hbase/regionserver/HRegionServer.java | 93 +++-- .../regionserver/ImmutableOnlineRegions.java | 4 +- ...IncreasingToUpperBoundRegionSplitPolicy.java | 2 +- .../hadoop/hbase/regionserver/LogRoller.java | 2 +- .../hbase/regionserver/MemStoreFlusher.java | 52 ++- .../MetricsRegionServerWrapperImpl.java | 4 +- .../hbase/regionserver/OnlineRegions.java | 4 +- .../hbase/regionserver/RSRpcServices.java | 80 ++--- .../hadoop/hbase/regionserver/Region.java | 335 ++----------------- .../regionserver/RegionCoprocessorHost.java | 4 +- .../hbase/regionserver/RegionMergeRequest.java | 5 +- .../regionserver/RegionServerServices.java | 34 +- .../regionserver/SecureBulkLoadManager.java | 4 +- .../hadoop/hbase/regionserver/SplitRequest.java | 5 +- .../handler/CloseRegionHandler.java | 5 +- .../regionserver/handler/OpenRegionHandler.java | 5 +- .../snapshot/FlushSnapshotSubprocedure.java | 17 +- .../snapshot/RegionServerSnapshotManager.java | 12 +- .../DefaultVisibilityLabelServiceImpl.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 4 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 8 +- .../hadoop/hbase/MockRegionServerServices.java | 25 +- .../hadoop/hbase/TestGlobalMemStoreSize.java | 8 +- .../hadoop/hbase/backup/TestHFileArchiving.java | 8 +- .../TestZooKeeperTableArchiveClient.java | 14 +- .../apache/hadoop/hbase/client/TestAdmin2.java | 3 +- .../hbase/client/TestAsyncClusterAdminApi.java | 4 +- .../hadoop/hbase/client/TestFromClientSide.java | 3 +- .../hbase/client/TestFromClientSide3.java | 2 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 3 +- .../hadoop/hbase/client/TestMetaCache.java | 6 - .../coprocessor/TestCoprocessorInterface.java | 17 +- .../TestRegionObserverScannerOpenHook.java | 6 +- .../apache/hadoop/hbase/filter/TestFilter.java | 6 +- .../filter/TestInvocationRecordFilter.java | 3 +- .../hadoop/hbase/fs/TestBlockReorder.java | 6 +- .../hbase/io/encoding/TestEncodedSeekers.java | 5 +- .../hbase/io/encoding/TestPrefixTree.java | 3 +- .../encoding/TestSeekBeforeWithReverseScan.java | 4 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 2 +- .../io/hfile/TestForceCacheImportantBlocks.java | 2 +- .../TestScannerSelectionUsingKeyRange.java | 3 +- .../io/hfile/TestScannerSelectionUsingTTL.java | 6 +- .../hadoop/hbase/master/MockRegionServer.java | 23 +- .../hadoop/hbase/master/TestMasterFailover.java | 5 +- .../hbase/master/TestRegionPlacement.java | 5 +- .../TestFavoredStochasticLoadBalancer.java | 3 +- .../balancer/TestRegionLocationFinder.java | 12 +- .../balancer/TestRegionsOnMasterOptions.java | 3 +- .../quotas/TestFileSystemUtilizationChore.java | 17 +- .../hbase/regionserver/RegionAsTable.java | 4 +- .../hbase/regionserver/TestAtomicOperation.java | 8 +- .../hbase/regionserver/TestBlocksRead.java | 6 +- .../hbase/regionserver/TestBlocksScanned.java | 2 +- .../hbase/regionserver/TestColumnSeeking.java | 4 +- .../TestCompactionArchiveConcurrentClose.java | 5 +- .../TestCompactionArchiveIOException.java | 6 +- .../hbase/regionserver/TestCompactionState.java | 8 +- .../regionserver/TestGetClosestAtOrBefore.java | 6 +- .../hadoop/hbase/regionserver/TestHRegion.java | 2 +- .../regionserver/TestHRegionReplayEvents.java | 10 +- .../regionserver/TestHeapMemoryManager.java | 4 +- .../hbase/regionserver/TestKeepDeletes.java | 2 +- .../hbase/regionserver/TestMajorCompaction.java | 6 +- .../hbase/regionserver/TestMinVersions.java | 12 +- .../regionserver/TestMultiColumnScanner.java | 2 +- .../regionserver/TestPerColumnFamilyFlush.java | 14 +- .../regionserver/TestRegionFavoredNodes.java | 8 +- .../hbase/regionserver/TestRegionReplicas.java | 10 +- .../TestRegionReplicasWithModifyTable.java | 40 +-- .../regionserver/TestRegionServerMetrics.java | 6 +- .../regionserver/TestRegionServerNoMaster.java | 2 +- .../regionserver/TestRegionSplitPolicy.java | 8 +- .../regionserver/TestResettingCounters.java | 2 +- .../regionserver/TestReversibleScanners.java | 4 +- .../hbase/regionserver/TestRowTooBig.java | 4 +- .../regionserver/TestScanWithBloomError.java | 2 +- .../regionserver/TestSeekOptimizations.java | 2 +- .../regionserver/TestSplitWalDataLoss.java | 4 +- .../TestStoreFileRefresherChore.java | 8 +- .../TestWalAndCompactingMemStoreFlush.java | 4 +- .../TestCompactedHFilesDischarger.java | 7 +- .../TestFlushWithThroughputController.java | 2 +- .../wal/AbstractTestLogRolling.java | 5 +- .../regionserver/wal/AbstractTestWALReplay.java | 6 +- .../hbase/regionserver/wal/TestFSHLog.java | 3 +- .../hbase/regionserver/wal/TestLogRolling.java | 3 +- .../TestRegionReplicaReplicationEndpoint.java | 5 +- .../visibility/TestVisibilityLabels.java | 4 +- .../hbase/snapshot/SnapshotTestingUtils.java | 5 +- 99 files changed, 619 insertions(+), 809 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index e8496a7..35bca3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2863,6 +2863,10 @@ public final class ProtobufUtil { return CompactionState.valueOf(state.toString()); } + public static GetRegionInfoResponse.CompactionState createCompactionState(CompactionState state) { + return GetRegionInfoResponse.CompactionState.valueOf(state.toString()); + } + public static Optional<Long> toOptionalTimestamp(MajorCompactionTimestampResponse resp) { long timestamp = resp.getCompactionTimestamp(); return timestamp == 0 ? Optional.empty() : Optional.of(timestamp); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index 9477eb1..8b579bf 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; @@ -141,7 +142,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements Reg try { SecureBulkLoadManager secureBulkLoadManager = this.rsServices.getSecureBulkLoadManager(); BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request); - map = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), + map = secureBulkLoadManager.secureBulkLoadHFiles((HRegion) this.env.getRegion(), convert(bulkLoadHFileRequest)); loaded = map != null && !map.isEmpty(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index cb5373d..233ea18 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -150,8 +150,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements RegionCopro for (List<Cell> deleteRow : deleteRows) { deleteArr[i++] = createDeleteMutation(deleteRow, deleteType, timestamp); } - OperationStatus[] opStatus = region.batchMutate(deleteArr, HConstants.NO_NONCE, - HConstants.NO_NONCE); + OperationStatus[] opStatus = region.batchMutate(deleteArr); for (i = 0; i < opStatus.length; i++) { if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { break; http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 79c82f6..230a42f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -41,7 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; @@ -446,7 +446,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements taskReadyLock.wait(checkInterval); if (server != null) { // check to see if we have stale recovering regions in our internal memory state - Map<String, Region> recoveringRegions = server.getRecoveringRegions(); + Map<String, HRegion> recoveringRegions = server.getRecoveringRegions(); if (!recoveringRegions.isEmpty()) { // Make a local copy to prevent ConcurrentModificationException when other threads // modify recoveringRegions http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 145bc14..1b4c561 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; -import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.HRegion; /** * This flush region implementation uses the distributed procedure framework to flush @@ -40,12 +40,12 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class); private final String table; - private final List<Region> regions; + private final List<HRegion> regions; private final FlushTableSubprocedurePool taskManager; public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, - List<Region> regions, String table, + List<HRegion> regions, String table, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); this.table = table; @@ -54,8 +54,8 @@ public class FlushTableSubprocedure extends Subprocedure { } private static class RegionFlushTask implements Callable<Void> { - Region region; - RegionFlushTask(Region region) { + HRegion region; + RegionFlushTask(HRegion region) { this.region = region; } @@ -90,7 +90,7 @@ public class FlushTableSubprocedure extends Subprocedure { } // Add all hfiles already existing in region. - for (Region region : regions) { + for (HRegion region : regions) { // submit one task per region for parallelize by region. taskManager.submitTask(new RegionFlushTask(region)); monitor.rethrowException(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 49192e1..ea34714 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.SubprocedureFactory; import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -139,7 +140,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur } // check to see if this server is hosting any regions for the table - List<Region> involvedRegions; + List<HRegion> involvedRegions; try { involvedRegions = getRegionsToFlush(table); } catch (IOException e1) { @@ -174,8 +175,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur * @return the list of online regions. Empty list is returned if no regions. * @throws IOException */ - private List<Region> getRegionsToFlush(String table) throws IOException { - return rss.getRegions(TableName.valueOf(table)); + private List<HRegion> getRegionsToFlush(String table) throws IOException { + return (List<HRegion>) rss.getRegions(TableName.valueOf(table)); } public class FlushTableSubprocedureBuilder implements SubprocedureFactory { http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java index 72f80e4..8a0dee6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java @@ -86,13 +86,13 @@ public class CompactedHFilesDischarger extends ScheduledChore { // Noop if rss is null. This will never happen in a normal condition except for cases // when the test case is not spinning up a cluster if (regionServerServices == null) return; - List<Region> onlineRegions = regionServerServices.getRegions(); + List<HRegion> onlineRegions = (List<HRegion>) regionServerServices.getRegions(); if (onlineRegions == null) return; - for (Region region : onlineRegions) { + for (HRegion region : onlineRegions) { if (LOG.isTraceEnabled()) { LOG.trace("Started compacted hfiles cleaner on " + region.getRegionInfo()); } - for (HStore store : ((HRegion) region).getStores()) { + for (HStore store : region.getStores()) { try { if (useExecutor && regionServerServices != null) { CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index e9f5d76..931a737 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -33,7 +33,7 @@ public interface FlushRequester { * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. */ - void requestFlush(Region region, boolean forceFlushAllStores); + void requestFlush(HRegion region, boolean forceFlushAllStores); /** * Tell the listener the cache needs to be flushed after a delay @@ -43,7 +43,7 @@ public interface FlushRequester { * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. */ - void requestDelayedFlush(Region region, long delay, boolean forceFlushAllStores); + void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores); /** * Register a FlushRequestListener http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2d35fb9..559ac95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -179,10 +180,10 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; @@ -197,6 +198,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +/** + * Regions store data for a certain region of a table. It stores all columns + * for each row. A given table consists of one or more Regions. + * + * <p>An Region is defined by its table and its key extent. + * + * <p>Locking at the Region level serves only one purpose: preventing the + * region from being closed (and consequently split) while other operations + * are ongoing. Each row level operation obtains both a row lock and a region + * read lock for the duration of the operation. While a scanner is being + * constructed, getScanner holds a read lock. If the scanner is successfully + * constructed, it holds a read lock until it is closed. A close takes out a + * write lock and consequently will block for ongoing operations and will block + * new operations from starting while the close is in progress. + */ @SuppressWarnings("deprecation") @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { @@ -782,7 +798,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this); this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper); - Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions(); + Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions(); String encodedName = getRegionInfo().getEncodedName(); if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) { this.recovering = true; @@ -1121,7 +1137,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.updatesLock.writeLock().unlock(); } - @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); stores.values().stream().filter(s -> s.getStorefiles() != null) @@ -1238,7 +1253,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return readRequestsCount.sum(); } - @Override + /** + * Update the read request count for this region + * @param i increment + */ public void updateReadRequestsCount(long i) { readRequestsCount.add(i); } @@ -1253,7 +1271,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return writeRequestsCount.sum(); } - @Override + /** + * Update the write request count for this region + * @param i increment + */ public void updateWriteRequestsCount(long i) { writeRequestsCount.add(i); } @@ -1263,7 +1284,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return memstoreDataSize.get(); } - @Override + /** @return store services for this region, to access services required by store level needs */ public RegionServicesForStores getRegionServicesForStores() { return regionServicesForStores; } @@ -1293,7 +1314,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return checkAndMutateChecksFailed.sum(); } - @Override + // TODO Needs to check whether we should expose our metrics system to CPs. If CPs themselves doing + // the op and bypassing the core, this might be needed? Should be stop supporting the bypass + // feature? public MetricsRegion getMetrics() { return metricsRegion; } @@ -1433,12 +1456,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return mvcc.getReadPoint(); } - @Override - public long getReadpoint(IsolationLevel isolationLevel) { - return getReadPoint(isolationLevel); - } - - @Override public boolean isLoadingCfsOnDemandDefault() { return this.isLoadingCfsOnDemandDefault; } @@ -1719,7 +1736,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return stores.values().stream().mapToLong(s -> s.getMemStoreSize().getHeapSize()).sum(); } - @Override + /** Wait for all current flushes and compactions of the region to complete */ + // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for + // Phoenix needs. public void waitForFlushesAndCompactions() { synchronized (writestate) { if (this.writestate.readOnly) { @@ -1748,7 +1767,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @Override + /** Wait for all current flushes of the region to complete + */ + // TODO HBASE-18906. Check the usage (if any) in Phoenix and expose this or give alternate way for + // Phoenix needs. public void waitForFlushes() { synchronized (writestate) { if (this.writestate.readOnly) { @@ -1941,7 +1963,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stores.values().forEach(HStore::triggerMajorCompaction); } - @Override + /** + * Synchronously compact all stores in the region. + * <p>This operation could block for a long time, so don't call it from a + * time-sensitive thread. + * <p>Note that no locks are taken to prevent possible conflicts between + * compaction and splitting activities. The regionserver does not normally compact + * and split in parallel. However by calling this method you may introduce + * unexpected and unhandled concurrency. Don't do this unless you know what + * you are doing. + * + * @param majorCompaction True to force a major compaction regardless of thresholds + * @throws IOException + */ public void compact(boolean majorCompaction) throws IOException { if (majorCompaction) { triggerMajorCompaction(); @@ -2157,11 +2191,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @Override + /** + * Flush the cache. + * + * <p>When this method is called the cache will be flushed unless: + * <ol> + * <li>the cache is empty</li> + * <li>the region is closed.</li> + * <li>a flush is already in progress</li> + * <li>writes are disabled</li> + * </ol> + * + * <p>This method may block for some time, so it should not be called from a + * time-sensitive thread. + * @param force whether we want to force a flush of all stores + * @return FlushResult indicating whether the flush was successful or not and if + * the region needs compacting + * + * @throws IOException general io exceptions + * because a snapshot was not properly persisted. + */ + // TODO HBASE-18905. We might have to expose a requestFlush API for CPs public FlushResult flush(boolean force) throws IOException { return flushcache(force, false); } + public static interface FlushResult { + enum Result { + FLUSHED_NO_COMPACTION_NEEDED, + FLUSHED_COMPACTION_NEEDED, + // Special case where a flush didn't run because there's nothing in the memstores. Used when + // bulk loading to know when we can still load even if a flush didn't happen. + CANNOT_FLUSH_MEMSTORE_EMPTY, + CANNOT_FLUSH + } + + /** @return the detailed result code */ + Result getResult(); + + /** @return true if the memstores were flushed, else false */ + boolean isFlushSucceeded(); + + /** @return True if the flush requested a compaction, else false */ + boolean isCompactionNeeded(); + } + /** * Flush the cache. * @@ -2805,7 +2879,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return new RegionScannerImpl(scan, additionalScanners, this, nonceGroup, nonce); } - @Override + /** + * Prepare a delete for a row mutation processor + * @param delete The passed delete is modified by this method. WARNING! + * @throws IOException + */ public void prepareDelete(Delete delete) throws IOException { // Check to see if this is a deleteRow insert if(delete.getFamilyCellMap().isEmpty()){ @@ -2854,7 +2932,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi doBatchMutate(delete); } - @Override + /** + * Set up correct timestamps in the KVs in Delete object. + * <p>Caller should have the row and region locks. + * @param mutation + * @param familyMap + * @param byteNow + * @throws IOException + */ public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap, byte[] byteNow) throws IOException { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { @@ -3043,7 +3128,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @Override public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) throws IOException { // As it stands, this is used for 3 things @@ -3053,11 +3137,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); } + @Override public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); } - @Override public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) @@ -3841,7 +3925,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @Override + /** + * Replace any cell timestamps set to {@link org.apache.hadoop.hbase.HConstants#LATEST_TIMESTAMP} + * provided current timestamp. + * @param cellItr + * @param now + */ public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now) throws IOException { for (List<Cell> cells: cellItr) { @@ -3991,14 +4080,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi store.add(cell, memstoreSize); } - @Override + /** + * Check the collection of families for validity. + * @param families + * @throws NoSuchColumnFamilyException + */ public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException { for (byte[] family : families) { checkFamily(family); } } - @Override + /** + * Check the collection of families for valid timestamps + * @param familyMap + * @param now current timestamp + * @throws FailedSanityCheckException + */ public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now) throws FailedSanityCheckException { if (timestampSlop == HConstants.LATEST_TIMESTAMP) { @@ -5423,8 +5521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - @Override - public void releaseRowLocks(List<RowLock> rowLocks) { + private void releaseRowLocks(List<RowLock> rowLocks) { if (rowLocks != null) { for (int i = 0; i < rowLocks.size(); i++) { rowLocks.get(i).release(); @@ -5560,13 +5657,67 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return multipleFamilies; } - @Override + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair<byte[] column family, String hfilePath> + * @param bulkLoadListener Internal hooks enabling massaging/preparation of a + * file about to be bulk loaded + * @param assignSeqId + * @return Map from family to List of store file paths if successful, null if failed recoverably + * @throws IOException if failed unrecoverably. + */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); } - @Override + /** + * Listener class to enable callers of + * bulkLoadHFile() to perform any necessary + * pre/post processing of a given bulkload call + */ + public static interface BulkLoadListener { + /** + * Called before an HFile is actually loaded + * @param family family being loaded to + * @param srcPath path of HFile + * @return final path to be used for actual loading + * @throws IOException + */ + String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile) + throws IOException; + + /** + * Called after a successful HFile load + * @param family family being loaded to + * @param srcPath path of HFile + * @throws IOException + */ + void doneBulkLoad(byte[] family, String srcPath) throws IOException; + + /** + * Called after a failed HFile load + * @param family family being loaded to + * @param srcPath path of HFile + * @throws IOException + */ + void failedBulkLoad(byte[] family, String srcPath) throws IOException; + } + + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair<byte[] column family, String hfilePath> + * @param assignSeqId + * @param bulkLoadListener Internal hooks enabling massaging/preparation of a + * file about to be bulk loaded + * @param copyFile always copy hfiles if true + * @return Map from family to List of store file paths if successful, null if failed recoverably + * @throws IOException if failed unrecoverably. + */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { long seqId = -1; @@ -6875,7 +7026,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE); } - @Override public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException { List<Cell> results = new ArrayList<>(); @@ -7167,22 +7317,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override public Result append(Append append) throws IOException { return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - @Override public Result append(Append mutation, long nonceGroup, long nonce) throws IOException { return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults()); } + @Override public Result increment(Increment increment) throws IOException { return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - @Override - public Result increment(Increment mutation, long nonceGroup, long nonce) - throws IOException { + public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException { return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults()); } @@ -7574,7 +7723,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum(); } - @Override + /** + * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to + * be available for handling Region#execService(com.google.protobuf.RpcController, + * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall) calls. + * + * <p> + * Only a single instance may be registered per region for a given {@link Service} subclass (the + * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. + * After the first registration, subsequent calls with the same service name will fail with + * a return value of {@code false}. + * </p> + * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint + * @return {@code true} if the registration was successful, {@code false} + * otherwise + */ public boolean registerService(com.google.protobuf.Service instance) { /* * No stacking of instances is allowed for a single service name @@ -7597,10 +7760,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } - @Override + /** + * Executes a single protocol buffer coprocessor endpoint {@link Service} method using + * the registered protocol handlers. {@link Service} implementations must be registered via the + * {@link #registerService(com.google.protobuf.Service)} + * method before they are available. + * + * @param controller an {@code RpcContoller} implementation to pass to the invoked service + * @param call a {@code CoprocessorServiceCall} instance identifying the service, method, + * and parameters for the method invocation + * @return a protocol buffer {@code Message} instance containing the method's result + * @throws IOException if no registered service handler is found or an error + * occurs during the invocation + * @see #registerService(com.google.protobuf.Service) + */ public com.google.protobuf.Message execService(com.google.protobuf.RpcController controller, - CoprocessorServiceCall call) - throws IOException { + CoprocessorServiceCall call) throws IOException { String serviceName = call.getServiceName(); com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); if (service == null) { @@ -7971,7 +8146,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } }; - @Override + /** @return the latest sequence number that was read from storage when this region was opened */ public long getOpenSeqNum() { return this.openSeqNum; } @@ -7981,7 +8156,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.maxSeqIdInStores; } - @Override public long getOldestSeqIdOfStore(byte[] familyName) { return wal.getEarliestMemStoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 16895bf..821b41f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -290,7 +290,7 @@ public class HRegionServer extends HasThread implements * Map of regions currently being served by this region server. Key is the * encoded region name. All access should be synchronized. */ - protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<>(); + protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); /** * Map of encoded region names to the DataNode locations they should be hosted on @@ -308,8 +308,8 @@ public class HRegionServer extends HasThread implements * Set of regions currently being in recovering state which means it can accept writes(edits from * previous failed region server) but not reads. A recovering region is also an online region. */ - protected final Map<String, Region> recoveringRegions = Collections - .synchronizedMap(new HashMap<String, Region>()); + protected final Map<String, HRegion> recoveringRegions = Collections + .synchronizedMap(new HashMap<String, HRegion>()); // Leases protected Leases leases; @@ -1235,7 +1235,7 @@ public class HRegionServer extends HasThread implements private boolean areAllUserRegionsOffline() { if (getNumberOfOnlineRegions() > 2) return false; boolean allUserRegionsOffline = true; - for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) { + for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { if (!e.getValue().getRegionInfo().isMetaTable()) { allUserRegionsOffline = false; break; @@ -1249,7 +1249,7 @@ public class HRegionServer extends HasThread implements */ private long getWriteRequestCount() { long writeCount = 0; - for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) { + for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { writeCount += e.getValue().getWriteRequestsCount(); } return writeCount; @@ -1369,7 +1369,7 @@ public class HRegionServer extends HasThread implements // improved; Additionally the load balancer will be able to take advantage of a more complete // history. MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); - Collection<Region> regions = getOnlineRegionsLocalContext(); + Collection<HRegion> regions = getOnlineRegionsLocalContext(); long usedMemory = -1L; long maxMemory = -1L; final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); @@ -1391,7 +1391,7 @@ public class HRegionServer extends HasThread implements } RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); - for (Region region : regions) { + for (HRegion region : regions) { if (region.getCoprocessorHost() != null) { Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); Iterator<String> iterator = regionCoprocessors.iterator(); @@ -1469,7 +1469,7 @@ public class HRegionServer extends HasThread implements // Ensure all user regions have been sent a close. Use this to // protect against the case where an open comes in after we start the // iterator of onlineRegions to close all user regions. - for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) { + for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) && !closedRegions.contains(hri.getEncodedName())) { @@ -1672,7 +1672,7 @@ public class HRegionServer extends HasThread implements * * @throws IOException */ - RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr, + RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, RegionSpecifier.Builder regionSpecifier) throws IOException { byte[] name = r.getRegionInfo().getRegionName(); int stores = 0; @@ -1740,7 +1740,7 @@ public class HRegionServer extends HasThread implements * @return An instance of RegionLoad. */ public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { - Region r = onlineRegions.get(encodedRegionName); + HRegion r = onlineRegions.get(encodedRegionName); return r != null ? createRegionLoad(r, null, null) : null; } @@ -1821,9 +1821,9 @@ public class HRegionServer extends HasThread implements @Override protected void chore() { final StringBuffer whyFlush = new StringBuffer(); - for (Region r : this.server.onlineRegions.values()) { + for (HRegion r : this.server.onlineRegions.values()) { if (r == null) continue; - if (((HRegion)r).shouldFlush(whyFlush)) { + if (r.shouldFlush(whyFlush)) { FlushRequester requester = server.getFlushRequester(); if (requester != null) { long randomDelay = RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME; @@ -2157,11 +2157,6 @@ public class HRegionServer extends HasThread implements } @Override - public void postOpenDeployTasks(final Region r) throws KeeperException, IOException { - postOpenDeployTasks(new PostOpenDeployContext(r, -1)); - } - - @Override public void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException { HRegion r = (HRegion) context.getRegion(); @@ -2198,18 +2193,6 @@ public class HRegionServer extends HasThread implements } @Override - public boolean reportRegionStateTransition(TransitionCode code, RegionInfo... hris) { - return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris); - } - - @Override - public boolean reportRegionStateTransition( - TransitionCode code, long openSeqNum, RegionInfo... hris) { - return reportRegionStateTransition( - new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris)); - } - - @Override public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { TransitionCode code = context.getCode(); long openSeqNum = context.getOpenSeqNum(); @@ -2664,10 +2647,10 @@ public class HRegionServer extends HasThread implements * @param abort Whether we're running an abort. */ void closeMetaTableRegions(final boolean abort) { - Region meta = null; + HRegion meta = null; this.lock.writeLock().lock(); try { - for (Map.Entry<String, Region> e: onlineRegions.entrySet()) { + for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) { RegionInfo hri = e.getValue().getRegionInfo(); if (hri.isMetaRegion()) { meta = e.getValue(); @@ -2689,8 +2672,8 @@ public class HRegionServer extends HasThread implements void closeUserRegions(final boolean abort) { this.lock.writeLock().lock(); try { - for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) { - Region r = e.getValue(); + for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) { + HRegion r = e.getValue(); if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) { // Don't update zk with this close transition; pass false. closeRegionIgnoreErrors(r.getRegionInfo(), abort); @@ -2720,7 +2703,7 @@ public class HRegionServer extends HasThread implements } @Override - public Map<String, Region> getRecoveringRegions() { + public Map<String, HRegion> getRecoveringRegions() { return this.recoveringRegions; } @@ -2751,13 +2734,13 @@ public class HRegionServer extends HasThread implements * This method will only work if HRegionServer is in the same JVM as client; * HRegion cannot be serialized to cross an rpc. */ - public Collection<Region> getOnlineRegionsLocalContext() { - Collection<Region> regions = this.onlineRegions.values(); + public Collection<HRegion> getOnlineRegionsLocalContext() { + Collection<HRegion> regions = this.onlineRegions.values(); return Collections.unmodifiableCollection(regions); } @Override - public void addRegion(Region region) { + public void addRegion(HRegion region) { this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); configurationManager.registerObserver(region); } @@ -2767,9 +2750,9 @@ public class HRegionServer extends HasThread implements * biggest. If two regions are the same size, then the last one found wins; i.e. this method * may NOT return all regions. */ - SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() { + SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() { // we'll sort the regions in reverse - SortedMap<Long, Region> sortedRegions = new TreeMap<>( + SortedMap<Long, HRegion> sortedRegions = new TreeMap<>( new Comparator<Long>() { @Override public int compare(Long a, Long b) { @@ -2777,7 +2760,7 @@ public class HRegionServer extends HasThread implements } }); // Copy over all regions. Regions are sorted by size with biggest first. - for (Region region : this.onlineRegions.values()) { + for (HRegion region : this.onlineRegions.values()) { sortedRegions.put(region.getMemStoreSize(), region); } return sortedRegions; @@ -3003,10 +2986,10 @@ public class HRegionServer extends HasThread implements * @return Online regions from <code>tableName</code> */ @Override - public List<Region> getRegions(TableName tableName) { - List<Region> tableRegions = new ArrayList<>(); + public List<HRegion> getRegions(TableName tableName) { + List<HRegion> tableRegions = new ArrayList<>(); synchronized (this.onlineRegions) { - for (Region region: this.onlineRegions.values()) { + for (HRegion region: this.onlineRegions.values()) { RegionInfo regionInfo = region.getRegionInfo(); if(regionInfo.getTable().equals(tableName)) { tableRegions.add(region); @@ -3017,8 +3000,8 @@ public class HRegionServer extends HasThread implements } @Override - public List<Region> getRegions() { - List<Region> allRegions = new ArrayList<>(); + public List<HRegion> getRegions() { + List<HRegion> allRegions = new ArrayList<>(); synchronized (this.onlineRegions) { // Return a clone copy of the onlineRegions allRegions.addAll(onlineRegions.values()); @@ -3051,8 +3034,8 @@ public class HRegionServer extends HasThread implements "skipping."); LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); } - Collection<Region> regions = getOnlineRegionsLocalContext(); - for (Region region: regions) { + Collection<HRegion> regions = getOnlineRegionsLocalContext(); + for (HRegion region: regions) { coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); try { coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); @@ -3170,7 +3153,7 @@ public class HRegionServer extends HasThread implements protected boolean closeAndOfflineRegionForSplitOrMerge( final List<String> regionEncodedName) throws IOException { for (int i = 0; i < regionEncodedName.size(); ++i) { - Region regionToClose = this.getRegion(regionEncodedName.get(i)); + HRegion regionToClose = this.getRegion(regionEncodedName.get(i)); if (regionToClose != null) { Map<byte[], List<HStoreFile>> hstoreFiles = null; Exception exceptionToThrow = null; @@ -3232,14 +3215,14 @@ public class HRegionServer extends HasThread implements } @Override - public Region getRegion(final String encodedRegionName) { + public HRegion getRegion(final String encodedRegionName) { return this.onlineRegions.get(encodedRegionName); } @Override - public boolean removeRegion(final Region r, ServerName destination) { - Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); + public boolean removeRegion(final HRegion r, ServerName destination) { + HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); if (destination != null) { long closeSeqNum = r.getMaxFlushedSeqId(); if (closeSeqNum == HConstants.NO_SEQNUM) { @@ -3261,20 +3244,20 @@ public class HRegionServer extends HasThread implements * @return {@link HRegion} for <code>regionName</code> * @throws NotServingRegionException */ - protected Region getRegion(final byte[] regionName) + protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { String encodedRegionName = RegionInfo.encodeRegionName(regionName); return getRegionByEncodedName(regionName, encodedRegionName); } - public Region getRegionByEncodedName(String encodedRegionName) + public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { return getRegionByEncodedName(null, encodedRegionName); } - protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName) + protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) throws NotServingRegionException { - Region region = this.onlineRegions.get(encodedRegionName); + HRegion region = this.onlineRegions.get(encodedRegionName); if (region == null) { MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); if (moveInfo != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableOnlineRegions.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableOnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableOnlineRegions.java index 3b84665..ffd1fa7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableOnlineRegions.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableOnlineRegions.java @@ -50,11 +50,11 @@ public interface ImmutableOnlineRegions { * @return List of Region * @throws java.io.IOException */ - List<Region> getRegions(TableName tableName) throws IOException; + List<? extends Region> getRegions(TableName tableName) throws IOException; /** * Get all online regions in this RS. * @return List of online Region */ - List<Region> getRegions(); + List<? extends Region> getRegions(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java index 82a5b32..3164e1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/IncreasingToUpperBoundRegionSplitPolicy.java @@ -110,7 +110,7 @@ public class IncreasingToUpperBoundRegionSplitPolicy extends ConstantSizeRegionS TableName tablename = region.getTableDescriptor().getTableName(); int tableRegionsCount = 0; try { - List<Region> hri = rss.getRegions(tablename); + List<? extends Region> hri = rss.getRegions(tablename); tableRegionsCount = hri == null || hri.isEmpty() ? 0 : hri.size(); } catch (IOException e) { LOG.debug("Failed getOnlineRegions " + tablename, e); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 30f9a51..20b6c5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -204,7 +204,7 @@ public class LogRoller extends HasThread implements Closeable { */ private void scheduleFlush(final byte [] encodedRegionName) { boolean scheduled = false; - Region r = this.services.getRegion(Bytes.toString(encodedRegionName)); + HRegion r = (HRegion) this.services.getRegion(Bytes.toString(encodedRegionName)); FlushRequester requester = null; if (r != null) { requester = this.services.getFlushRequester(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index f37f8f6..82390bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -126,8 +126,8 @@ class MemStoreFlusher implements FlushRequester { * @return true if successful */ private boolean flushOneForGlobalPressure() { - SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); - Set<Region> excludedRegions = new HashSet<>(); + SortedMap<Long, HRegion> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); + Set<HRegion> excludedRegions = new HashSet<>(); double secondaryMultiplier = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); @@ -136,12 +136,12 @@ class MemStoreFlusher implements FlushRequester { while (!flushedOne) { // Find the biggest region that doesn't have too many storefiles // (might be null!) - Region bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); + HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true); // Find the biggest region, total, even if it might have too many flushes. - Region bestAnyRegion = getBiggestMemStoreRegion( + HRegion bestAnyRegion = getBiggestMemStoreRegion( regionsBySize, excludedRegions, false); // Find the biggest region that is a secondary region - Region bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, + HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions); if (bestAnyRegion == null && bestRegionReplica == null) { @@ -149,7 +149,7 @@ class MemStoreFlusher implements FlushRequester { return false; } - Region regionToFlush; + HRegion regionToFlush; if (bestFlushableRegion != null && bestAnyRegion.getMemStoreSize() > 2 * bestFlushableRegion.getMemStoreSize()) { // Even if it's not supposed to be flushed, pick a region if it's more than twice @@ -283,18 +283,17 @@ class MemStoreFlusher implements FlushRequester { } } - private Region getBiggestMemStoreRegion( - SortedMap<Long, Region> regionsBySize, - Set<Region> excludedRegions, + private HRegion getBiggestMemStoreRegion( + SortedMap<Long, HRegion> regionsBySize, + Set<HRegion> excludedRegions, boolean checkStoreFileCount) { synchronized (regionsInQueue) { - for (Region region : regionsBySize.values()) { + for (HRegion region : regionsBySize.values()) { if (excludedRegions.contains(region)) { continue; } - if (((HRegion)region).writestate.flushing || - !((HRegion)region).writestate.writesEnabled) { + if (region.writestate.flushing || !region.writestate.writesEnabled) { continue; } @@ -307,10 +306,10 @@ class MemStoreFlusher implements FlushRequester { return null; } - private Region getBiggestMemStoreOfRegionReplica(SortedMap<Long, Region> regionsBySize, - Set<Region> excludedRegions) { + private HRegion getBiggestMemStoreOfRegionReplica(SortedMap<Long, HRegion> regionsBySize, + Set<HRegion> excludedRegions) { synchronized (regionsInQueue) { - for (Region region : regionsBySize.values()) { + for (HRegion region : regionsBySize.values()) { if (excludedRegions.contains(region)) { continue; } @@ -349,8 +348,8 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(Region r, boolean forceFlushAllStores) { - ((HRegion)r).incrementFlushesQueuedCount(); + public void requestFlush(HRegion r, boolean forceFlushAllStores) { + r.incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush @@ -363,8 +362,8 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) { - ((HRegion)r).incrementFlushesQueuedCount(); + public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { + r.incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay @@ -430,7 +429,7 @@ class MemStoreFlusher implements FlushRequester { * not flushed. */ private boolean flushRegion(final FlushRegionEntry fqe) { - Region region = fqe.region; + HRegion region = fqe.region; if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { if (fqe.isMaximumWait(this.blockingWaitTime)) { @@ -479,7 +478,7 @@ class MemStoreFlusher implements FlushRequester { * false, there will be accompanying log messages explaining why the region was * not flushed. */ - private boolean flushRegion(final Region region, final boolean emergencyFlush, + private boolean flushRegion(final HRegion region, final boolean emergencyFlush, boolean forceFlushAllStores) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); @@ -497,12 +496,11 @@ class MemStoreFlusher implements FlushRequester { FlushResult flushResult = region.flush(forceFlushAllStores); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size - boolean shouldSplit = ((HRegion)region).checkSplit() != null; + boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestSystemCompaction((HRegion) region, - Thread.currentThread().getName()); + server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical @@ -728,7 +726,7 @@ class MemStoreFlusher implements FlushRequester { * a while. */ static class FlushRegionEntry implements FlushQueueEntry { - private final Region region; + private final HRegion region; private final long createTime; private long whenToExpire; @@ -736,7 +734,7 @@ class MemStoreFlusher implements FlushRequester { private boolean forceFlushAllStores; - FlushRegionEntry(final Region r, boolean forceFlushAllStores) { + FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index a7feb94..515b1ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -214,7 +214,7 @@ class MetricsRegionServerWrapperImpl @Override public long getNumOnlineRegions() { - Collection<Region> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext(); + Collection<HRegion> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext(); if (onlineRegionsLocalContext == null) { return 0; } @@ -754,7 +754,7 @@ class MetricsRegionServerWrapperImpl long tempMobScanCellsSize = 0; long tempBlockedRequestsCount = 0; int regionCount = 0; - for (Region r : regionServer.getOnlineRegionsLocalContext()) { + for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL(); tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL(); tempReadRequestsCount += r.getReadRequestsCount(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index d389bdf..c9f1c47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -32,7 +32,7 @@ public interface OnlineRegions extends ImmutableOnlineRegions { * Add to online regions. * @param r */ - void addRegion(final Region r); + void addRegion(final HRegion r); /** * This method removes Region corresponding to hri from the Map of onlineRegions. @@ -41,5 +41,5 @@ public interface OnlineRegions extends ImmutableOnlineRegions { * @param destination Destination, if any, null otherwise. * @return True if we removed a region from online list. */ - boolean removeRegion(final Region r, ServerName destination); + boolean removeRegion(final HRegion r, ServerName destination); } http://git-wip-us.apache.org/repos/asf/hbase/blob/b4ed1300/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 70b3475..440b318 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -388,13 +388,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final AtomicLong nextCallSeq = new AtomicLong(0); private final String scannerName; private final RegionScanner s; - private final Region r; + private final HRegion r; private final RpcCallback closeCallBack; private final RpcCallback shippedCallback; private byte[] rowOfLastPartialResult; private boolean needCursor; - public RegionScannerHolder(String scannerName, RegionScanner s, Region r, + public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r, RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { this.scannerName = scannerName; this.s = s; @@ -432,7 +432,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); - Region region = null; + HRegion region = null; try { region = regionServer.getRegion(s.getRegionInfo().getRegionName()); if (region != null && region.getCoprocessorHost() != null) { @@ -547,7 +547,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. * @throws IOException */ - private void mutateRows(final Region region, + private void mutateRows(final HRegion region, final List<ClientProtos.Action> actions, final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { if (!region.getRegionInfo().isMetaTable()) { @@ -600,7 +600,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param compareOp * @param comparator @throws IOException */ - private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, + private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { @@ -656,7 +656,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * bypassed as indicated by RegionObserver, null otherwise * @throws IOException */ - private Result append(final Region region, final OperationQuota quota, + private Result append(final HRegion region, final OperationQuota quota, final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, ActivePolicyEnforcement spaceQuota) throws IOException { @@ -707,7 +707,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @return the Result * @throws IOException */ - private Result increment(final Region region, final OperationQuota quota, + private Result increment(final HRegion region, final OperationQuota quota, final MutationProto mutation, final CellScanner cells, long nonceGroup, ActivePolicyEnforcement spaceQuota) throws IOException { @@ -763,7 +763,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param context the current RpcCallContext * @return Return the <code>cellScanner</code> passed */ - private List<CellScannable> doNonAtomicRegionMutation(final Region region, + private List<CellScannable> doNonAtomicRegionMutation(final HRegion region, final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, @@ -926,11 +926,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return cellsToReturn; } - private void checkCellSizeLimit(final Region region, final Mutation m) throws IOException { - if (!(region instanceof HRegion)) { - return; - } - HRegion r = (HRegion)region; + private void checkCellSizeLimit(final HRegion r, final Mutation m) throws IOException { if (r.maxCellSize > 0) { CellScanner cells = m.cellScanner(); while (cells.advance()) { @@ -953,7 +949,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param region * @param mutations */ - private void doBatchOp(final RegionActionResult.Builder builder, final Region region, + private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region, final OperationQuota quota, final List<ClientProtos.Action> mutations, final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { Mutation[] mArray = new Mutation[mutations.size()]; @@ -1050,7 +1046,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * exceptionMessage if any * @throws IOException */ - private OperationStatus [] doReplayBatchOp(final Region region, + private OperationStatus [] doReplayBatchOp(final HRegion region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; @@ -1335,7 +1331,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return lastBlock; } - private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r, + private RegionScannerHolder addScanner(String scannerName, RegionScanner s, HRegion r, boolean needCursor) throws LeaseStillHeldException { Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); @@ -1363,7 +1359,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * but failed to find the region */ @VisibleForTesting - public Region getRegion( + public HRegion getRegion( final RegionSpecifier regionSpecifier) throws IOException { ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); @@ -1652,9 +1648,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Map<String, Region> onlineRegions = regionServer.onlineRegions; + Map<String, HRegion> onlineRegions = regionServer.onlineRegions; List<RegionInfo> list = new ArrayList<>(onlineRegions.size()); - for (Region region: onlineRegions.values()) { + for (HRegion region: onlineRegions.values()) { list.add(region.getRegionInfo()); } Collections.sort(list, RegionInfo.COMPARATOR); @@ -1671,7 +1667,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); RegionInfo info = region.getRegionInfo(); byte[] bestSplitRow = null; if (request.hasBestSplitRow() && request.getBestSplitRow()) { @@ -1690,7 +1686,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); builder.setRegionInfo(ProtobufUtil.toRegionInfo(info)); if (request.hasCompactionState() && request.getCompactionState()) { - builder.setCompactionState(region.getCompactionState()); + builder.setCompactionState(ProtobufUtil.createCompactionState(region.getCompactionState())); } builder.setSplittable(region.isSplittable()); builder.setMergeable(region.isMergeable()); @@ -1709,7 +1705,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public GetRegionLoadResponse getRegionLoad(RpcController controller, GetRegionLoadRequest request) throws ServiceException { - List<Region> regions; + List<HRegion> regions; if (request.hasTableName()) { TableName tableName = ProtobufUtil.toTableName(request.getTableName()); regions = regionServer.getRegions(tableName); @@ -1721,7 +1717,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); try { - for (Region region : regions) { + for (HRegion region : regions) { rLoads.add(regionServer.createRegionLoad(region, regionLoadBuilder, regionSpecifier)); } } catch (IOException e) { @@ -1797,7 +1793,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final GetStoreFileRequest request) throws ServiceException { try { checkOpen(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); requestCount.increment(); Set<byte[]> columnFamilies; if (request.getFamilyCount() == 0) { @@ -1902,7 +1898,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { String encodedName = region.getEncodedName(); byte[] encodedNameBytes = region.getEncodedNameAsBytes(); - final Region onlineRegion = regionServer.getRegion(encodedName); + final HRegion onlineRegion = regionServer.getRegion(encodedName); if (onlineRegion != null) { // The region is already online. This should not happen any more. String error = "Received OPEN for the region:" @@ -2026,7 +2022,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); String encodedName = region.getEncodedName(); byte[] encodedNameBytes = region.getEncodedNameAsBytes(); - final Region onlineRegion = regionServer.getRegion(encodedName); + final HRegion onlineRegion = regionServer.getRegion(encodedName); if (onlineRegion != null) { LOG.info("Region already online. Skipping warming up " + region); @@ -2077,7 +2073,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return ReplicateWALEntryResponse.newBuilder().build(); } ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); - Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); + HRegion region = regionServer.getRegionByEncodedName(regionName.toStringUtf8()); RegionCoprocessorHost coprocessorHost = ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) ? region.getCoprocessorHost() @@ -2133,7 +2129,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } //sync wal at the end because ASYNC_WAL is used above - WAL wal = getWAL(region); + WAL wal = region.getWAL(); if (wal != null) { wal.sync(); } @@ -2155,10 +2151,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - WAL getWAL(Region region) { - return ((HRegion)region).getWAL(); - } - /** * Replicate WAL entries on the region server. * @@ -2258,7 +2250,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); boolean bypass = false; boolean loaded = false; Map<byte[], List<Path>> map = null; @@ -2328,7 +2320,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); @@ -2346,7 +2338,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); @@ -2362,7 +2354,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); com.google.protobuf.Message result = execServiceOnRegion(region, request.getCall()); CoprocessorServiceResponse.Builder builder = CoprocessorServiceResponse.newBuilder(); builder.setRegion(RequestConverter.buildRegionSpecifier( @@ -2377,7 +2369,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private com.google.protobuf.Message execServiceOnRegion(Region region, + private com.google.protobuf.Message execServiceOnRegion(HRegion region, final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException { // ignore the passed in controller (from the serialized call) ServerRpcController execController = new ServerRpcController(); @@ -2401,7 +2393,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, requestCount.increment(); requestRowActionCount.increment(); rpcGetRequestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); GetResponse.Builder builder = GetResponse.newBuilder(); ClientProtos.Get get = request.getGet(); @@ -2567,7 +2559,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, for (RegionAction regionAction : request.getRegionActionList()) { this.requestRowActionCount.add(regionAction.getActionCount()); OperationQuota quota; - Region region; + HRegion region; regionActionResultBuilder.clear(); RegionSpecifier regionSpecifier = regionAction.getRegion(); try { @@ -2702,7 +2694,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, requestCount.increment(); requestRowActionCount.increment(); rpcMutateRequestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaTable()) { @@ -2892,7 +2884,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) throws IOException { - Region region = getRegion(request.getRegion()); + HRegion region = getRegion(request.getRegion()); ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); Scan scan = ProtobufUtil.toScan(protoScan); @@ -2992,7 +2984,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results, ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) throws IOException { - Region region = rsh.r; + HRegion region = rsh.r; RegionScanner scanner = rsh.s; long maxResultSize; if (scanner.getMaxResultSize() > 0) { @@ -3226,7 +3218,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } throw new ServiceException(e); } - Region region = rsh.r; + HRegion region = rsh.r; String scannerName = rsh.scannerName; Leases.Lease lease; try { @@ -3407,7 +3399,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private void closeScanner(Region region, RegionScanner scanner, String scannerName, + private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, RpcCallContext context) throws IOException { if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost().preScannerClose(scanner)) {