http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java new file mode 100644 index 0000000..d558aaf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.SortedSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.HConnectionTestingUtility; +import org.apache.hadoop.hbase.master.LoadBalancer; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.master.MockNoopMasterServices; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.util.FSUtils; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * A mocked master services. + * Tries to fake it. May not always work. + */ +public class MockMasterServices extends MockNoopMasterServices { + private final MasterFileSystem fileSystemManager; + private final MasterWalManager walManager; + private final AssignmentManager assignmentManager; + + private MasterProcedureEnv procedureEnv; + private ProcedureExecutor<MasterProcedureEnv> procedureExecutor; + private ProcedureStore procedureStore; + private final ClusterConnection connection; + private final LoadBalancer balancer; + private final ServerManager serverManager; + // Set of regions on a 'server'. Populated externally. Used in below faking 'cluster'. + private final NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers; + + private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); + public static final String DEFAULT_COLUMN_FAMILY_NAME = "cf"; + public static final ServerName MOCK_MASTER_SERVERNAME = + ServerName.valueOf("mockmaster.example.org", 1234, -1L); + + public MockMasterServices(Configuration conf, + NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers) + throws IOException { + super(conf); + this.regionsToRegionServers = regionsToRegionServers; + Superusers.initialize(conf); + this.fileSystemManager = new MasterFileSystem(this); + this.walManager = new MasterWalManager(this); + // Mock an AM. + this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) { + public boolean isTableEnabled(final TableName tableName) { + return true; + } + + public boolean isTableDisabled(final TableName tableName) { + return false; + } + + @Override + protected boolean waitServerReportEvent(ServerName serverName, Procedure proc) { + // Make a report with current state of the server 'serverName' before we call wait.. + SortedSet<byte []> regions = regionsToRegionServers.get(serverName); + getAssignmentManager().reportOnlineRegions(serverName, 0, + regions == null? new HashSet<byte []>(): regions); + return super.waitServerReportEvent(serverName, proc); + } + }; + this.balancer = LoadBalancerFactory.getLoadBalancer(conf); + this.serverManager = new ServerManager(this); + + // Mock up a Client Interface + ClientProtos.ClientService.BlockingInterface ri = + Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); + MutateResponse.Builder builder = MutateResponse.newBuilder(); + builder.setProcessed(true); + try { + Mockito.when(ri.mutate((RpcController)Mockito.any(), (MutateRequest)Mockito.any())). + thenReturn(builder.build()); + } catch (ServiceException se) { + throw ProtobufUtil.handleRemoteException(se); + } + try { + Mockito.when(ri.multi((RpcController)Mockito.any(), (MultiRequest)Mockito.any())). + thenAnswer(new Answer<MultiResponse>() { + @Override + public MultiResponse answer(InvocationOnMock invocation) throws Throwable { + return buildMultiResponse( (MultiRequest)invocation.getArguments()[1]); + } + }); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + // Mock n ClusterConnection and an AdminProtocol implementation. Have the + // ClusterConnection return the HRI. Have the HRI return a few mocked up responses + // to make our test work. + this.connection = + HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(), + Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME, + HRegionInfo.FIRST_META_REGIONINFO); + // Set hbase.rootdir into test dir. + Path rootdir = FSUtils.getRootDir(getConfiguration()); + FSUtils.setRootDir(getConfiguration(), rootdir); + Mockito.mock(AdminProtos.AdminService.BlockingInterface.class); + } + + public void start(final int numServes, final RSProcedureDispatcher remoteDispatcher) + throws IOException { + startProcedureExecutor(remoteDispatcher); + this.assignmentManager.start(); + for (int i = 0; i < numServes; ++i) { + serverManager.regionServerReport( + ServerName.valueOf("localhost", 100 + i, 1), ServerLoad.EMPTY_SERVERLOAD); + } + this.procedureExecutor.getEnvironment().setEventReady(initialized, true); + } + + @Override + public void stop(String why) { + stopProcedureExecutor(); + this.assignmentManager.stop(); + } + + private void startProcedureExecutor(final RSProcedureDispatcher remoteDispatcher) + throws IOException { + final Configuration conf = getConfiguration(); + final Path logDir = new Path(fileSystemManager.getRootDir(), + MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); + + //procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir, + // new MasterProcedureEnv.WALStoreLeaseRecovery(this)); + this.procedureStore = new NoopProcedureStore(); + this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + + this.procedureEnv = new MasterProcedureEnv(this, + remoteDispatcher != null ? remoteDispatcher : new RSProcedureDispatcher(this)); + + this.procedureExecutor = new ProcedureExecutor(conf, procedureEnv, procedureStore, + procedureEnv.getProcedureScheduler()); + + final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, + Math.max(Runtime.getRuntime().availableProcessors(), + MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + final boolean abortOnCorruption = conf.getBoolean( + MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, + MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); + this.procedureStore.start(numThreads); + this.procedureExecutor.start(numThreads, abortOnCorruption); + this.procedureEnv.getRemoteDispatcher().start(); + } + + private void stopProcedureExecutor() { + if (this.procedureEnv != null) { + this.procedureEnv.getRemoteDispatcher().stop(); + } + + if (this.procedureExecutor != null) { + this.procedureExecutor.stop(); + } + + if (this.procedureStore != null) { + this.procedureStore.stop(isAborted()); + } + } + + @Override + public boolean isInitialized() { + return true; + } + + @Override + public ProcedureEvent getInitializedEvent() { + return this.initialized; + } + + @Override + public MasterFileSystem getMasterFileSystem() { + return fileSystemManager; + } + + @Override + public MasterWalManager getMasterWalManager() { + return walManager; + } + + @Override + public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return procedureExecutor; + } + + @Override + public LoadBalancer getLoadBalancer() { + return balancer; + } + + @Override + public ServerManager getServerManager() { + return serverManager; + } + + @Override + public AssignmentManager getAssignmentManager() { + return assignmentManager; + } + + @Override + public ClusterConnection getConnection() { + return this.connection; + } + + @Override + public ServerName getServerName() { + return MOCK_MASTER_SERVERNAME; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return super.getCoordinatedStateManager(); + } + + private static class MockRegionStateStore extends RegionStateStore { + public MockRegionStateStore(final MasterServices master) { + super(master); + } + + @Override + public void start() throws IOException { + } + + @Override + public void stop() { + } + + @Override + public void updateRegionLocation(HRegionInfo regionInfo, State state, ServerName regionLocation, + ServerName lastHost, long openSeqNum, long pid) throws IOException { + } + } + + @Override + public TableDescriptors getTableDescriptors() { + return new TableDescriptors() { + @Override + public HTableDescriptor remove(TableName tablename) throws IOException { + // noop + return null; + } + + @Override + public Map<String, HTableDescriptor> getAll() throws IOException { + // noop + return null; + } + + @Override public Map<String, HTableDescriptor> getAllDescriptors() throws IOException { + // noop + return null; + } + + @Override + public HTableDescriptor get(TableName tablename) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tablename); + htd.addFamily(new HColumnDescriptor(DEFAULT_COLUMN_FAMILY_NAME)); + return htd; + } + + @Override + public Map<String, HTableDescriptor> getByNamespace(String name) throws IOException { + return null; + } + + @Override + public void add(HTableDescriptor htd) throws IOException { + // noop + } + + @Override + public void setCacheOn() throws IOException { + } + + @Override + public void setCacheOff() throws IOException { + } + }; + } + + private static MultiResponse buildMultiResponse(MultiRequest req) { + MultiResponse.Builder builder = MultiResponse.newBuilder(); + RegionActionResult.Builder regionActionResultBuilder = + RegionActionResult.newBuilder(); + ResultOrException.Builder roeBuilder = ResultOrException.newBuilder(); + for (RegionAction regionAction: req.getRegionActionList()) { + regionActionResultBuilder.clear(); + for (ClientProtos.Action action: regionAction.getActionList()) { + roeBuilder.clear(); + roeBuilder.setResult(ClientProtos.Result.getDefaultInstance()); + roeBuilder.setIndex(action.getIndex()); + regionActionResultBuilder.addResultOrException(roeBuilder.build()); + } + builder.addRegionActionResult(regionActionResultBuilder.build()); + } + return builder.build(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java new file mode 100644 index 0000000..dda41e0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -0,0 +1,750 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.NavigableMap; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +@Category({MasterTests.class, MediumTests.class}) +public class TestAssignmentManager { + private static final Log LOG = LogFactory.getLog(TestAssignmentManager.class); + static { + Logger.getLogger(MasterProcedureScheduler.class).setLevel(Level.TRACE); + } + @Rule public TestName name = new TestName(); + @Rule public final TestRule timeout = + CategoryBasedTimeout.builder().withTimeout(this.getClass()). + withLookingForStuckThread(true).build(); + + private static final int PROC_NTHREADS = 64; + private static final int NREGIONS = 1 * 1000; + private static final int NSERVERS = Math.max(1, NREGIONS / 100); + + private HBaseTestingUtility UTIL; + private MockRSProcedureDispatcher rsDispatcher; + private MockMasterServices master; + private AssignmentManager am; + private NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers = + new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>(); + // Simple executor to run some simple tasks. + private ScheduledExecutorService executor; + + private void setupConfiguration(Configuration conf) throws Exception { + FSUtils.setRootDir(conf, UTIL.getDataTestDir()); + conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); + conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); + conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); + conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. + } + + @Before + public void setUp() throws Exception { + UTIL = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(); + setupConfiguration(UTIL.getConfiguration()); + master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + master.start(NSERVERS, rsDispatcher); + am = master.getAssignmentManager(); + setUpMeta(); + } + + private void setUpMeta() throws Exception { + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + am.assign(HRegionInfo.FIRST_META_REGIONINFO); + am.wakeMetaLoadedEvent(); + am.setFailoverCleanupDone(true); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + @Test (expected=NullPointerException.class) + public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException { + // Test what happens if we pass in null server. I'd expect it throws NPE. + if (this.am.waitServerReportEvent(null, null)) throw new UnexpectedStateException(); + } + + @Ignore @Test // TODO + public void testGoodSplit() throws Exception { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + HRegionInfo hri = new HRegionInfo(tableName, Bytes.toBytes(0), Bytes.toBytes(2), false, 0); + SplitTableRegionProcedure split = + new SplitTableRegionProcedure(this.master.getMasterProcedureExecutor().getEnvironment(), + hri, Bytes.toBytes(1)); + rsDispatcher.setMockRsExecutor(new GoodSplitExecutor()); + long st = System.currentTimeMillis(); + Thread t = new Thread() { + public void run() { + try { + waitOnFuture(submitProcedure(split)); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + t.start(); + t.join(); + long et = System.currentTimeMillis(); + float sec = ((et - st) / 1000.0f); + LOG.info(String.format("[T] Splitting in %s", StringUtils.humanTimeDiff(et - st))); + } + + @Test + public void testAssignWithGoodExec() throws Exception { + testAssign(new GoodRsExecutor()); + } + + @Test + public void testAssignAndCrashBeforeResponse() throws Exception { + final TableName tableName = TableName.valueOf("testAssignAndCrashBeforeResponse"); + final HRegionInfo hri = createRegionInfo(tableName, 1); + rsDispatcher.setMockRsExecutor(new HangThenRSCrashExecutor()); + AssignProcedure proc = am.createAssignProcedure(hri, false); + waitOnFuture(submitProcedure(proc)); + } + + @Test + public void testUnassignAndCrashBeforeResponse() throws Exception { + final TableName tableName = TableName.valueOf("testAssignAndCrashBeforeResponse"); + final HRegionInfo hri = createRegionInfo(tableName, 1); + rsDispatcher.setMockRsExecutor(new HangOnCloseThenRSCrashExecutor()); + for (int i = 0; i < HangOnCloseThenRSCrashExecutor.TYPES_OF_FAILURE; i++) { + AssignProcedure assign = am.createAssignProcedure(hri, false); + waitOnFuture(submitProcedure(assign)); + UnassignProcedure unassign = am.createUnassignProcedure(hri, + am.getRegionStates().getRegionServerOfRegion(hri), false); + waitOnFuture(submitProcedure(unassign)); + } + } + + @Test + public void testAssignWithRandExec() throws Exception { + final TableName tableName = TableName.valueOf("testAssignWithRandExec"); + final HRegionInfo hri = createRegionInfo(tableName, 1); + + rsDispatcher.setMockRsExecutor(new RandRsExecutor()); + // Loop a bunch of times so we hit various combos of exceptions. + for (int i = 0; i < 10; i++) { + LOG.info("" + i); + AssignProcedure proc = am.createAssignProcedure(hri, false); + waitOnFuture(submitProcedure(proc)); + } + } + + @Test + public void testSocketTimeout() throws Exception { + final TableName tableName = TableName.valueOf(this.name.getMethodName()); + final HRegionInfo hri = createRegionInfo(tableName, 1); + + rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3)); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + + rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3)); + waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false))); + } + + @Test + public void testServerNotYetRunning() throws Exception { + testRetriesExhaustedFailure(TableName.valueOf(this.name.getMethodName()), + new ServerNotYetRunningRsExecutor()); + } + + private void testRetriesExhaustedFailure(final TableName tableName, + final MockRSExecutor executor) throws Exception { + final HRegionInfo hri = createRegionInfo(tableName, 1); + + // Test Assign operation failure + rsDispatcher.setMockRsExecutor(executor); + try { + waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + fail("unexpected assign completion"); + } catch (RetriesExhaustedException e) { + // expected exception + LOG.info("expected exception from assign operation: " + e.getMessage(), e); + } + + // Assign the region (without problems) + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + + // TODO: Currently unassign just keeps trying until it sees a server crash. + // There is no count on unassign. + /* + // Test Unassign operation failure + rsDispatcher.setMockRsExecutor(executor); + waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false))); + */ + } + + + @Test + public void testIOExceptionOnAssignment() throws Exception { + testFailedOpen(TableName.valueOf("testExceptionOnAssignment"), + new FaultyRsExecutor(new IOException("test fault"))); + } + + @Test + public void testDoNotRetryExceptionOnAssignment() throws Exception { + testFailedOpen(TableName.valueOf("testDoNotRetryExceptionOnAssignment"), + new FaultyRsExecutor(new DoNotRetryIOException("test do not retry fault"))); + } + + private void testFailedOpen(final TableName tableName, + final MockRSExecutor executor) throws Exception { + final HRegionInfo hri = createRegionInfo(tableName, 1); + + // Test Assign operation failure + rsDispatcher.setMockRsExecutor(executor); + try { + waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + fail("unexpected assign completion"); + } catch (RetriesExhaustedException e) { + // expected exception + LOG.info("REGION STATE " + am.getRegionStates().getRegionNode(hri)); + LOG.info("expected exception from assign operation: " + e.getMessage(), e); + assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen()); + } + } + + private void testAssign(final MockRSExecutor executor) throws Exception { + testAssign(executor, NREGIONS); + } + + private void testAssign(final MockRSExecutor executor, final int nregions) throws Exception { + rsDispatcher.setMockRsExecutor(executor); + + AssignProcedure[] assignments = new AssignProcedure[nregions]; + + long st = System.currentTimeMillis(); + bulkSubmit(assignments); + + for (int i = 0; i < assignments.length; ++i) { + ProcedureTestingUtility.waitProcedure( + master.getMasterProcedureExecutor(), assignments[i]); + assertTrue(assignments[i].toString(), assignments[i].isSuccess()); + } + long et = System.currentTimeMillis(); + float sec = ((et - st) / 1000.0f); + LOG.info(String.format("[T] Assigning %dprocs in %s (%.2fproc/sec)", + assignments.length, StringUtils.humanTimeDiff(et - st), assignments.length / sec)); + } + + @Test + public void testAssignAnAssignedRegion() throws Exception { + final TableName tableName = TableName.valueOf("testAssignAnAssignedRegion"); + final HRegionInfo hri = createRegionInfo(tableName, 1); + + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + + final Future<byte[]> futureA = submitProcedure(am.createAssignProcedure(hri, false)); + + // wait first assign + waitOnFuture(futureA); + am.getRegionStates().isRegionInState(hri, State.OPEN); + // Second should be a noop. We should recognize region is already OPEN internally + // and skip out doing nothing. + // wait second assign + final Future<byte[]> futureB = submitProcedure(am.createAssignProcedure(hri, false)); + waitOnFuture(futureB); + am.getRegionStates().isRegionInState(hri, State.OPEN); + // TODO: What else can we do to ensure just a noop. + } + + @Test + public void testUnassignAnUnassignedRegion() throws Exception { + final TableName tableName = TableName.valueOf("testUnassignAnUnassignedRegion"); + final HRegionInfo hri = createRegionInfo(tableName, 1); + + rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); + + // assign the region first + waitOnFuture(submitProcedure(am.createAssignProcedure(hri, false))); + + final Future<byte[]> futureA = submitProcedure(am.createUnassignProcedure(hri, null, false)); + + // Wait first unassign. + waitOnFuture(futureA); + am.getRegionStates().isRegionInState(hri, State.CLOSED); + // Second should be a noop. We should recognize region is already CLOSED internally + // and skip out doing nothing. + final Future<byte[]> futureB = + submitProcedure(am.createUnassignProcedure(hri, + ServerName.valueOf("example.org,1234,1"), false)); + waitOnFuture(futureB); + // Ensure we are still CLOSED. + am.getRegionStates().isRegionInState(hri, State.CLOSED); + // TODO: What else can we do to ensure just a noop. + } + + private Future<byte[]> submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + private byte[] waitOnFuture(final Future<byte[]> future) throws Exception { + try { + return future.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + LOG.info("ExecutionException", e); + throw (Exception)e.getCause(); + } + } + + // ============================================================================================ + // Helpers + // ============================================================================================ + private void bulkSubmit(final AssignProcedure[] procs) throws Exception { + final Thread[] threads = new Thread[PROC_NTHREADS]; + for (int i = 0; i < threads.length; ++i) { + final int threadId = i; + threads[i] = new Thread() { + @Override + public void run() { + TableName tableName = TableName.valueOf("table-" + threadId); + int n = (procs.length / threads.length); + int start = threadId * n; + int stop = start + n; + for (int j = start; j < stop; ++j) { + procs[j] = createAndSubmitAssign(tableName, j); + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { + procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); + } + } + + private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { + HRegionInfo hri = createRegionInfo(tableName, regionId); + AssignProcedure proc = am.createAssignProcedure(hri, false); + master.getMasterProcedureExecutor().submitProcedure(proc); + return proc; + } + + private UnassignProcedure createAndSubmitUnassign(TableName tableName, int regionId) { + HRegionInfo hri = createRegionInfo(tableName, regionId); + UnassignProcedure proc = am.createUnassignProcedure(hri, null, false); + master.getMasterProcedureExecutor().submitProcedure(proc); + return proc; + } + + private HRegionInfo createRegionInfo(final TableName tableName, final long regionId) { + return new HRegionInfo(tableName, + Bytes.toBytes(regionId), Bytes.toBytes(regionId + 1), false, 0); + } + + private void sendTransitionReport(final ServerName serverName, + final RegionInfo regionInfo, final TransitionCode state) throws IOException { + ReportRegionStateTransitionRequest.Builder req = + ReportRegionStateTransitionRequest.newBuilder(); + req.setServer(ProtobufUtil.toServerName(serverName)); + req.addTransition(RegionStateTransition.newBuilder() + .addRegionInfo(regionInfo) + .setTransitionCode(state) + .setOpenSeqNum(1) + .build()); + am.reportRegionStateTransition(req.build()); + } + + private void doCrash(final ServerName serverName) { + this.am.submitServerCrash(serverName, false/*No WALs here*/); + } + + private class NoopRsExecutor implements MockRSExecutor { + public ExecuteProceduresResponse sendRequest(ServerName server, + ExecuteProceduresRequest request) throws IOException { + ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); + if (request.getOpenRegionCount() > 0) { + for (OpenRegionRequest req: request.getOpenRegionList()) { + OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder(); + for (RegionOpenInfo openReq: req.getOpenInfoList()) { + RegionOpeningState state = execOpenRegion(server, openReq); + if (state != null) { + resp.addOpeningState(state); + } + } + builder.addOpenRegion(resp.build()); + } + } + if (request.getCloseRegionCount() > 0) { + for (CloseRegionRequest req: request.getCloseRegionList()) { + CloseRegionResponse resp = execCloseRegion(server, + req.getRegion().getValue().toByteArray()); + if (resp != null) { + builder.addCloseRegion(resp); + } + } + } + return ExecuteProceduresResponse.newBuilder().build(); + } + + protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) + throws IOException { + return null; + } + + protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + return null; + } + } + + private class GoodRsExecutor extends NoopRsExecutor { + @Override + protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) + throws IOException { + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + // Concurrency? + // Now update the state of our cluster in regionsToRegionServers. + SortedSet<byte []> regions = regionsToRegionServers.get(server); + if (regions == null) { + regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); + regionsToRegionServers.put(server, regions); + } + HRegionInfo hri = HRegionInfo.convert(openReq.getRegion()); + if (regions.contains(hri.getRegionName())) { + throw new UnsupportedOperationException(hri.getRegionNameAsString()); + } + regions.add(hri.getRegionName()); + return RegionOpeningState.OPENED; + } + + @Override + protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + HRegionInfo hri = am.getRegionInfo(regionName); + sendTransitionReport(server, HRegionInfo.convert(hri), TransitionCode.CLOSED); + return CloseRegionResponse.newBuilder().setClosed(true).build(); + } + } + + private static class ServerNotYetRunningRsExecutor implements MockRSExecutor { + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + throw new ServerNotRunningYetException("wait on server startup"); + } + } + + private static class FaultyRsExecutor implements MockRSExecutor { + private final IOException exception; + + public FaultyRsExecutor(final IOException exception) { + this.exception = exception; + } + + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + throw exception; + } + } + + private class SocketTimeoutRsExecutor extends GoodRsExecutor { + private final int maxSocketTimeoutRetries; + private final int maxServerRetries; + + private ServerName lastServer; + private int sockTimeoutRetries; + private int serverRetries; + + public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { + this.maxServerRetries = maxServerRetries; + this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; + } + + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + // SocketTimeoutException should be a temporary problem + // unless the server will be declared dead. + if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { + if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); + lastServer = server; + LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); + throw new SocketTimeoutException("simulate socket timeout"); + } else if (serverRetries++ < maxServerRetries) { + LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); + master.getServerManager().moveFromOnlineToDeadServers(server); + sockTimeoutRetries = 0; + throw new SocketTimeoutException("simulate socket timeout"); + } else { + return super.sendRequest(server, req); + } + } + } + + /** + * Takes open request and then returns nothing so acts like a RS that went zombie. + * No response (so proc is stuck/suspended on the Master and won't wake up.). We + * then send in a crash for this server after a few seconds; crash is supposed to + * take care of the suspended procedures. + */ + private class HangThenRSCrashExecutor extends GoodRsExecutor { + private int invocations; + + @Override + protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) + throws IOException { + if (this.invocations++ > 0) { + // Return w/o problem the second time through here. + return super.execOpenRegion(server, openReq); + } + // The procedure on master will just hang forever because nothing comes back + // from the RS in this case. + LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + return null; + } + } + + private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { + public static final int TYPES_OF_FAILURE = 6; + private int invocations; + + @Override + protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + switch (this.invocations++) { + case 0: throw new NotServingRegionException("Fake"); + case 1: throw new RegionServerAbortedException("Fake!"); + case 2: throw new RegionServerStoppedException("Fake!"); + case 3: throw new ServerNotRunningYetException("Fake!"); + case 4: + LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Sending in CRASH of " + server); + doCrash(server); + } + }, 1, TimeUnit.SECONDS); + return null; + default: + return super.execCloseRegion(server, regionName); + } + } + } + + private class RandRsExecutor extends NoopRsExecutor { + private final Random rand = new Random(); + + public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException { + switch (rand.nextInt(5)) { + case 0: throw new ServerNotRunningYetException("wait on server startup"); + case 1: throw new SocketTimeoutException("simulate socket timeout"); + case 2: throw new RemoteException("java.io.IOException", "unexpected exception"); + } + return super.sendRequest(server, req); + } + + @Override + protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) + throws IOException { + switch (rand.nextInt(6)) { + case 0: + LOG.info("Return OPENED response"); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + return OpenRegionResponse.RegionOpeningState.OPENED; + case 1: + LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; + case 2: + LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); + sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN); + return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; + } + // The procedure on master will just hang forever because nothing comes back + // from the RS in this case. + LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds..."); + executor.schedule(new Runnable() { + @Override + public void run() { + LOG.info("Delayed CRASHING of " + server); + doCrash(server); + } + }, 5, TimeUnit.SECONDS); + return null; + } + + @Override + protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); + boolean closed = rand.nextBoolean(); + if (closed) { + HRegionInfo hri = am.getRegionInfo(regionName); + sendTransitionReport(server, HRegionInfo.convert(hri), TransitionCode.CLOSED); + } + resp.setClosed(closed); + return resp.build(); + } + } + + private interface MockRSExecutor { + ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) + throws IOException; + } + + private class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> operations) { + submitTask(new MockRemoteCall(serverName, operations)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, + final Set<RemoteProcedure> operations) { + super(serverName, operations); + } + + @Override + protected ExecuteProceduresResponse sendRequest(final ServerName serverName, + final ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } + + private class GoodSplitExecutor extends NoopRsExecutor { + + /* + @Override + protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) + throws IOException { + sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED); + // Concurrency? + // Now update the state of our cluster in regionsToRegionServers. + SortedSet<byte []> regions = regionsToRegionServers.get(server); + if (regions == null) { + regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); + regionsToRegionServers.put(server, regions); + } + HRegionInfo hri = HRegionInfo.convert(openReq.getRegion()); + if (regions.contains(hri.getRegionName())) { + throw new UnsupportedOperationException(hri.getRegionNameAsString()); + } + regions.add(hri.getRegionName()); + return RegionOpeningState.OPENED; + } + + @Override + protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) + throws IOException { + HRegionInfo hri = am.getRegionInfo(regionName); + sendTransitionReport(server, HRegionInfo.convert(hri), TransitionCode.CLOSED); + return CloseRegionResponse.newBuilder().setClosed(true).build(); + }*/ + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentOnRSCrash.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentOnRSCrash.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentOnRSCrash.java new file mode 100644 index 0000000..e4cec45 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentOnRSCrash.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, LargeTests.class}) +public class TestAssignmentOnRSCrash { + private static final Log LOG = LogFactory.getLog(TestAssignmentOnRSCrash.class); + + private static final TableName TEST_TABLE = TableName.valueOf("testb"); + private static final String FAMILY_STR = "f"; + private static final byte[] FAMILY = Bytes.toBytes(FAMILY_STR); + private static final int NUM_RS = 3; + + private HBaseTestingUtility UTIL; + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + conf.set("hbase.balancer.tablesOnMaster", "none"); + } + + @Before + public void setup() throws Exception { + UTIL = new HBaseTestingUtility(); + + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_RS); + + UTIL.createTable(TEST_TABLE, new byte[][] { FAMILY }, new byte[][] { + Bytes.toBytes("B"), Bytes.toBytes("D"), Bytes.toBytes("F"), Bytes.toBytes("L") + }); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test(timeout=30000) + public void testKillRsWithUserRegionWithData() throws Exception { + testCrashRsWithUserRegion(true, true); + } + + @Test(timeout=30000) + public void testKillRsWithUserRegionWithoutData() throws Exception { + testCrashRsWithUserRegion(true, false); + } + + @Test(timeout=30000) + public void testStopRsWithUserRegionWithData() throws Exception { + testCrashRsWithUserRegion(false, true); + } + + @Test(timeout=30000) + public void testStopRsWithUserRegionWithoutData() throws Exception { + testCrashRsWithUserRegion(false, false); + } + + private void testCrashRsWithUserRegion(final boolean kill, final boolean withData) + throws Exception { + final int NROWS = 100; + int nkilled = 0; + for (HRegionInfo hri: UTIL.getHBaseAdmin().getTableRegions(TEST_TABLE)) { + ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(UTIL, hri); + if (AssignmentTestingUtil.isServerHoldingMeta(UTIL, serverName)) continue; + + if (withData) { + testInsert(hri, NROWS); + } + + // wait for regions to enter in transition and then to get out of transition + AssignmentTestingUtil.crashRs(UTIL, serverName, kill); + AssignmentTestingUtil.waitForRegionToBeInTransition(UTIL, hri); + UTIL.waitUntilNoRegionsInTransition(); + + if (withData) { + assertEquals(NROWS, testGet(hri, NROWS)); + } + + // region should be moved to another RS + assertNotEquals(serverName, AssignmentTestingUtil.getServerHoldingRegion(UTIL, hri)); + + if (++nkilled == (NUM_RS - 1)) { + break; + } + } + assertTrue("expected RSs to be killed", nkilled > 0); + } + + @Test(timeout=60000) + public void testKillRsWithMetaRegion() throws Exception { + testCrashRsWithMetaRegion(true); + } + + @Test(timeout=60000) + public void testStopRsWithMetaRegion() throws Exception { + testCrashRsWithMetaRegion(false); + } + + private void testCrashRsWithMetaRegion(final boolean kill) throws Exception { + int nkilled = 0; + for (HRegionInfo hri: AssignmentTestingUtil.getMetaRegions(UTIL)) { + ServerName serverName = AssignmentTestingUtil.crashRsWithRegion(UTIL, hri, kill); + + // wait for region to enter in transition and then to get out of transition + AssignmentTestingUtil.waitForRegionToBeInTransition(UTIL, hri); + UTIL.waitUntilNoRegionsInTransition(); + testGet(hri, 10); + + // region should be moved to another RS + assertNotEquals(serverName, AssignmentTestingUtil.getServerHoldingRegion(UTIL, hri)); + + if (++nkilled == (NUM_RS - 1)) { + break; + } + } + assertTrue("expected RSs to be killed", nkilled > 0); + } + + private void testInsert(final HRegionInfo hri, final int nrows) throws IOException { + final Table table = UTIL.getConnection().getTable(hri.getTable()); + for (int i = 0; i < nrows; ++i) { + final byte[] row = Bytes.add(hri.getStartKey(), Bytes.toBytes(i)); + final Put put = new Put(row); + put.addColumn(FAMILY, null, row); + table.put(put); + } + } + + public int testGet(final HRegionInfo hri, final int nrows) throws IOException { + int nresults = 0; + final Table table = UTIL.getConnection().getTable(hri.getTable()); + for (int i = 0; i < nrows; ++i) { + final byte[] row = Bytes.add(hri.getStartKey(), Bytes.toBytes(i)); + final Result result = table.get(new Get(row)); + if (result != null && !result.isEmpty() && + Bytes.equals(row, result.getValue(FAMILY, null))) { + nresults++; + } + } + return nresults; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestMergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestMergeTableRegionsProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestMergeTableRegionsProcedure.java new file mode 100644 index 0000000..44fd575 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestMergeTableRegionsProcedure.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +@Category({MasterTests.class, MediumTests.class}) +public class TestMergeTableRegionsProcedure { + private static final Log LOG = LogFactory.getLog(TestMergeTableRegionsProcedure.class); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + @Rule public final TestName name = new TestName(); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static long nonceGroup = HConstants.NO_NONCE; + private static long nonce = HConstants.NO_NONCE; + + private static final int initialRegionCount = 4; + private final static byte[] FAMILY = Bytes.toBytes("FAMILY"); + final static Configuration conf = UTIL.getConfiguration(); + private static Admin admin; + + private static void setupConf(Configuration conf) { + // Reduce the maximum attempts to speed up the test + conf.setInt("hbase.assignment.maximum.attempts", 3); + conf.setInt("hbase.master.maximum.ping.server.attempts", 3); + conf.setInt("hbase.master.ping.server.retry.sleep.interval", 1); + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(conf); + UTIL.startMiniCluster(1); + admin = UTIL.getHBaseAdmin(); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Before + public void setup() throws Exception { + resetProcExecutorTestingKillFlag(); + nonceGroup = + MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster()); + nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster()); + // Turn off balancer so it doesn't cut in and mess up our placements. + UTIL.getHBaseAdmin().setBalancerRunning(false, true); + // Turn off the meta scanner so it don't remove parent on us. + UTIL.getHBaseCluster().getMaster().setCatalogJanitorEnabled(false); + resetProcExecutorTestingKillFlag(); + } + + @After + public void tearDown() throws Exception { + resetProcExecutorTestingKillFlag(); + for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + UTIL.deleteTable(htd.getTableName()); + } + } + + private void resetProcExecutorTestingKillFlag() { + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + assertTrue("expected executor to be running", procExec.isRunning()); + } + + /** + * This tests two region merges + */ + @Test + public void testMergeTwoRegions() throws Exception { + final TableName tableName = TableName.valueOf(this.name.getMethodName()); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + + List<HRegionInfo> tableRegions = createTable(tableName); + + HRegionInfo[] regionsToMerge = new HRegionInfo[2]; + regionsToMerge[0] = tableRegions.get(0); + regionsToMerge[1] = tableRegions.get(1); + MergeTableRegionsProcedure proc = + new MergeTableRegionsProcedure(procExec.getEnvironment(), regionsToMerge, true); + long procId = procExec.submitProcedure(proc); + ProcedureTestingUtility.waitProcedure(procExec, procId); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + assertRegionCount(tableName, initialRegionCount - 1); + Pair<HRegionInfo, HRegionInfo> pair = + MetaTableAccessor.getRegionsFromMergeQualifier(UTIL.getConnection(), + proc.getMergedRegion().getRegionName()); + assertTrue(pair.getFirst() != null && pair.getSecond() != null); + + // Can I purge the merged regions from hbase:meta? Check that all went + // well by looking at the merged row up in hbase:meta. It should have no + // more mention of the merged regions; they are purged as last step in + // the merged regions cleanup. + UTIL.getHBaseCluster().getMaster().setCatalogJanitorEnabled(true); + UTIL.getHBaseCluster().getMaster().getCatalogJanitor().triggerNow(); + while (pair != null && pair.getFirst() != null && pair.getSecond() != null) { + pair = MetaTableAccessor.getRegionsFromMergeQualifier(UTIL.getConnection(), + proc.getMergedRegion().getRegionName()); + } + } + + /** + * This tests two concurrent region merges + */ + @Test + public void testMergeRegionsConcurrently() throws Exception { + final TableName tableName = TableName.valueOf("testMergeRegionsConcurrently"); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + + List<HRegionInfo> tableRegions = createTable(tableName); + + HRegionInfo[] regionsToMerge1 = new HRegionInfo[2]; + HRegionInfo[] regionsToMerge2 = new HRegionInfo[2]; + regionsToMerge1[0] = tableRegions.get(0); + regionsToMerge1[1] = tableRegions.get(1); + regionsToMerge2[0] = tableRegions.get(2); + regionsToMerge2[1] = tableRegions.get(3); + + long procId1 = procExec.submitProcedure(new MergeTableRegionsProcedure( + procExec.getEnvironment(), regionsToMerge1, true)); + long procId2 = procExec.submitProcedure(new MergeTableRegionsProcedure( + procExec.getEnvironment(), regionsToMerge2, true)); + ProcedureTestingUtility.waitProcedure(procExec, procId1); + ProcedureTestingUtility.waitProcedure(procExec, procId2); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); + assertRegionCount(tableName, initialRegionCount - 2); + } + + @Test + public void testRecoveryAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution"); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + + List<HRegionInfo> tableRegions = createTable(tableName); + + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + HRegionInfo[] regionsToMerge = new HRegionInfo[2]; + regionsToMerge[0] = tableRegions.get(0); + regionsToMerge[1] = tableRegions.get(1); + + long procId = procExec.submitProcedure( + new MergeTableRegionsProcedure(procExec.getEnvironment(), regionsToMerge, true)); + + // Restart the executor and execute the step twice + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + + assertRegionCount(tableName, initialRegionCount - 1); + } + + @Test + public void testRollbackAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution"); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + + List<HRegionInfo> tableRegions = createTable(tableName); + + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + HRegionInfo[] regionsToMerge = new HRegionInfo[2]; + regionsToMerge[0] = tableRegions.get(0); + regionsToMerge[1] = tableRegions.get(1); + + long procId = procExec.submitProcedure( + new MergeTableRegionsProcedure(procExec.getEnvironment(), regionsToMerge, true)); + + // Failing before MERGE_TABLE_REGIONS_UPDATE_META we should trigger the rollback + // NOTE: the 5 (number before MERGE_TABLE_REGIONS_UPDATE_META step) is + // hardcoded, so you have to look at this test at least once when you add a new step. + int numberOfSteps = 5; + MasterProcedureTestingUtility.testRollbackAndDoubleExecution(procExec, procId, numberOfSteps); + } + + private List<HRegionInfo> createTable(final TableName tableName) + throws Exception { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(FAMILY)); + byte[][] splitRows = new byte[initialRegionCount - 1][]; + for (int i = 0; i < splitRows.length; ++i) { + splitRows[i] = Bytes.toBytes(String.format("%d", i)); + } + admin.createTable(desc, splitRows); + return assertRegionCount(tableName, initialRegionCount); + } + + public List<HRegionInfo> assertRegionCount(final TableName tableName, final int nregions) + throws Exception { + UTIL.waitUntilNoRegionsInTransition(); + List<HRegionInfo> tableRegions = admin.getTableRegions(tableName); + assertEquals(nregions, tableRegions.size()); + return tableRegions; + } + + private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/657a5d46/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStates.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStates.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStates.java new file mode 100644 index 0000000..003dfdd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStates.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertEquals; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MasterTests.class, MediumTests.class}) +public class TestRegionStates { + private static final Log LOG = LogFactory.getLog(TestRegionStates.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static ThreadPoolExecutor threadPool; + private static ExecutorCompletionService executorService; + + @BeforeClass + public static void setUp() throws Exception { + threadPool = Threads.getBoundedCachedThreadPool(32, 60L, TimeUnit.SECONDS, + Threads.newDaemonThreadFactory("ProcedureDispatcher", + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Failed thread " + t.getName(), e); + } + })); + executorService = new ExecutorCompletionService(threadPool); + } + + @AfterClass + public static void tearDown() throws Exception { + threadPool.shutdown(); + } + + @Before + public void testSetup() { + } + + @After + public void testTearDown() throws Exception { + while (true) { + Future<Object> f = executorService.poll(); + if (f == null) break; + f.get(); + } + } + + private static void waitExecutorService(final int count) throws Exception { + for (int i = 0; i < count; ++i) { + executorService.take().get(); + } + } + + // ========================================================================== + // Regions related + // ========================================================================== + + @Test + public void testRegionDoubleCreation() throws Exception { + // NOTE: HRegionInfo sort by table first, so we are relying on that + final TableName TABLE_NAME_A = TableName.valueOf("testOrderedByTableA"); + final TableName TABLE_NAME_B = TableName.valueOf("testOrderedByTableB"); + final TableName TABLE_NAME_C = TableName.valueOf("testOrderedByTableC"); + final RegionStates stateMap = new RegionStates(); + final int NRUNS = 1000; + final int NSMALL_RUNS = 3; + + // add some regions for table B + for (int i = 0; i < NRUNS; ++i) { + addRegionNode(stateMap, TABLE_NAME_B, i); + } + // re-add the regions for table B + for (int i = 0; i < NRUNS; ++i) { + addRegionNode(stateMap, TABLE_NAME_B, i); + } + waitExecutorService(NRUNS * 2); + + // add two other tables A and C that will be placed before and after table B (sort order) + for (int i = 0; i < NSMALL_RUNS; ++i) { + addRegionNode(stateMap, TABLE_NAME_A, i); + addRegionNode(stateMap, TABLE_NAME_C, i); + } + waitExecutorService(NSMALL_RUNS * 2); + // check for the list of regions of the 3 tables + checkTableRegions(stateMap, TABLE_NAME_A, NSMALL_RUNS); + checkTableRegions(stateMap, TABLE_NAME_B, NRUNS); + checkTableRegions(stateMap, TABLE_NAME_C, NSMALL_RUNS); + } + + private void checkTableRegions(final RegionStates stateMap, + final TableName tableName, final int nregions) { + List<HRegionInfo> hris = stateMap.getRegionsOfTable(tableName, true); + assertEquals(nregions, hris.size()); + for (int i = 1; i < hris.size(); ++i) { + long a = Bytes.toLong(hris.get(i - 1).getStartKey()); + long b = Bytes.toLong(hris.get(i + 0).getStartKey()); + assertEquals(b, a + 1); + } + } + + private void addRegionNode(final RegionStates stateMap, + final TableName tableName, final long regionId) { + executorService.submit(new Callable<Object>() { + @Override + public Object call() { + HRegionInfo hri = new HRegionInfo(tableName, + Bytes.toBytes(regionId), Bytes.toBytes(regionId + 1), false, 0); + return stateMap.getOrCreateRegionNode(hri); + } + }); + } + + private Object createRegionNode(final RegionStates stateMap, + final TableName tableName, final long regionId) { + return stateMap.getOrCreateRegionNode(createRegionInfo(tableName, regionId)); + } + + private HRegionInfo createRegionInfo(final TableName tableName, final long regionId) { + return new HRegionInfo(tableName, + Bytes.toBytes(regionId), Bytes.toBytes(regionId + 1), false, 0); + } + + @Test + public void testPerf() throws Exception { + final TableName TABLE_NAME = TableName.valueOf("testPerf"); + final int NRUNS = 1000000; // 1M + final RegionStates stateMap = new RegionStates(); + + long st = System.currentTimeMillis(); + for (int i = 0; i < NRUNS; ++i) { + final int regionId = i; + executorService.submit(new Callable<Object>() { + @Override + public Object call() { + HRegionInfo hri = createRegionInfo(TABLE_NAME, regionId); + return stateMap.getOrCreateRegionNode(hri); + } + }); + } + waitExecutorService(NRUNS); + long et = System.currentTimeMillis(); + LOG.info(String.format("PERF STATEMAP INSERT: %s %s/sec", + StringUtils.humanTimeDiff(et - st), + StringUtils.humanSize(NRUNS / ((et - st) / 1000.0f)))); + + st = System.currentTimeMillis(); + for (int i = 0; i < NRUNS; ++i) { + final int regionId = i; + executorService.submit(new Callable<Object>() { + @Override + public Object call() { + HRegionInfo hri = createRegionInfo(TABLE_NAME, regionId); + return stateMap.getRegionState(hri); + } + }); + } + + waitExecutorService(NRUNS); + et = System.currentTimeMillis(); + LOG.info(String.format("PERF STATEMAP GET: %s %s/sec", + StringUtils.humanTimeDiff(et - st), + StringUtils.humanSize(NRUNS / ((et - st) / 1000.0f)))); + } + + @Test + public void testPerfSingleThread() { + final TableName TABLE_NAME = TableName.valueOf("testPerf"); + final int NRUNS = 1 * 1000000; // 1M + + final RegionStates stateMap = new RegionStates(); + long st = System.currentTimeMillis(); + for (int i = 0; i < NRUNS; ++i) { + stateMap.createRegionNode(createRegionInfo(TABLE_NAME, i)); + } + long et = System.currentTimeMillis(); + LOG.info(String.format("PERF SingleThread: %s %s/sec", + StringUtils.humanTimeDiff(et - st), + StringUtils.humanSize(NRUNS / ((et - st) / 1000.0f)))); + } + + // ========================================================================== + // Server related + // ========================================================================== +}