Repository: hbase Updated Branches: refs/heads/branch-2.1 d35f65f39 -> 6c9e3d067
HBASE-21364 Procedure holds the lock should put to front of the queue after restart Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6c9e3d06 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6c9e3d06 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6c9e3d06 Branch: refs/heads/branch-2.1 Commit: 6c9e3d0670bef6c159981850b9c138f60b2c8317 Parents: d35f65f Author: Allan Yang <allan...@apache.org> Authored: Wed Oct 24 10:52:52 2018 +0800 Committer: Allan Yang <allan...@apache.org> Committed: Wed Oct 24 10:52:52 2018 +0800 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 10 + .../hadoop/hbase/procedure2/Procedure.java | 4 + .../hbase/procedure2/ProcedureExecutor.java | 11 +- .../hbase/procedure2/ProcedureScheduler.java | 14 ++ .../TestMasterProcedureSchedulerOnRestart.java | 207 +++++++++++++++++++ 5 files changed, 245 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6c9e3d06/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 5645f89..7ab1329 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -86,6 +86,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } @Override + public void addFront(final Procedure procedure, boolean notify) { + push(procedure, true, notify); + } + + @Override public void addFront(Iterator<Procedure> procedureIterator) { schedLock(); try { @@ -109,6 +114,11 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { push(procedure, false, true); } + @Override + public void addBack(final Procedure procedure, boolean notify) { + push(procedure, false, notify); + } + protected void push(final Procedure procedure, final boolean addFront, final boolean notify) { schedLock(); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/6c9e3d06/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 01dc1be..472a0d1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -719,6 +719,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE this.lockedWhenLoading = true; } + public boolean isLockedWhenLoading() { + return lockedWhenLoading; + } + // ============================================================================================== // Runtime state, updated every operation by the ProcedureExecutor // http://git-wip-us.apache.org/repos/asf/hbase/blob/6c9e3d06/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3548e6e..c6c34df 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -652,8 +652,17 @@ public class ProcedureExecutor<TEnvironment> { if (!p.hasParent()) { sendProcedureLoadedNotification(p.getProcId()); } - scheduler.addBack(p); + // If the procedure holds the lock, put the procedure in front + if (p.isLockedWhenLoading()) { + scheduler.addFront(p, false); + } else { + // if it was not, it can wait. + scheduler.addBack(p, false); + } }); + // After all procedures put into the queue, signal the worker threads. + // Otherwise, there is a race condition. See HBASE-21364. + scheduler.signalAll(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/6c9e3d06/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index e7e1cdb..9489f52 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -53,6 +53,13 @@ public interface ProcedureScheduler { void addFront(Procedure proc); /** + * Inserts the specified element at the front of this queue. + * @param proc the Procedure to add + * @param notify whether need to notify worker + */ + void addFront(Procedure proc, boolean notify); + + /** * Inserts all elements in the iterator at the front of this queue. */ void addFront(Iterator<Procedure> procedureIterator); @@ -64,6 +71,13 @@ public interface ProcedureScheduler { void addBack(Procedure proc); /** + * Inserts the specified element at the end of this queue. + * @param proc the Procedure to add + * @param notify whether need to notify worker + */ + void addBack(Procedure proc, boolean notify); + + /** * The procedure can't run at the moment. * add it back to the queue, giving priority to someone else. * @param proc the Procedure to add back to the list http://git-wip-us.apache.org/repos/asf/hbase/blob/6c9e3d06/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java new file mode 100644 index 0000000..206d33d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.DummyRegionProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + + +@Category({MasterTests.class, SmallTests.class}) +public class TestMasterProcedureSchedulerOnRestart { + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule + .forClass(TestMasterProcedureSchedulerOnRestart.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestMasterProcedureSchedulerOnRestart.class); + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final TableName tablename = TableName.valueOf("test:TestProcedureScheduler"); + private static RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tablename).build(); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + + private static WALProcedureStore procStore; + + private static ProcedureExecutor<MasterProcedureEnv> procExecutor; + + private static HBaseCommonTestingUtility htu; + + private static MasterProcedureEnv masterProcedureEnv; + + + private static FileSystem fs; + private static Path testDir; + private static Path logDir; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + procExecutor = UTIL.getMiniHBaseCluster().getMaster() + .getMasterProcedureExecutor(); + } + + @Test + public void testScheduler() throws Exception { + // Add a region procedure, but stuck there + long regionProc = procExecutor.submitProcedure(new DummyRegionProcedure( + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getEnvironment(), regionInfo)); + WALProcedureStore walProcedureStore = (WALProcedureStore) procExecutor.getStore(); + // Roll the wal + walProcedureStore.rollWriterForTesting(); + Thread.sleep(500); + // Submit a table procedure + procExecutor.submitProcedure(new DummyTableProcedure( + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getEnvironment(), tablename)); + // Restart the procExecutor + ProcedureTestingUtility.restart(procExecutor); + while (procExecutor.getProcedure(regionProc) == null) { + Thread.sleep(500); + } + DummyRegionProcedure dummyRegionProcedure = (DummyRegionProcedure) procExecutor + .getProcedure(regionProc); + // Resume the region procedure + dummyRegionProcedure.resume(); + // The region procedure should finish normally + UTIL.waitFor(5000, () -> dummyRegionProcedure.isFinished()); + + } + + public static class DummyTableProcedure extends + AbstractStateMachineTableProcedure<DummyRegionTableState> { + + private TableName tableName; + + public DummyTableProcedure() { + super(); + } + public DummyTableProcedure(final MasterProcedureEnv env, TableName tableName) { + super(null, null); + this.tableName = tableName; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.CREATE; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + DummyRegionTableState dummyRegionTableState) + throws ProcedureSuspendedException, ProcedureYieldException, + InterruptedException { + return null; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + DummyRegionTableState dummyRegionTableState) + throws IOException, InterruptedException { + + } + + @Override + protected DummyRegionTableState getState(int stateId) { + return DummyRegionTableState.STATE; + } + + @Override + protected int getStateId(DummyRegionTableState dummyRegionTableState) { + return 0; + } + + @Override + protected DummyRegionTableState getInitialState() { + return DummyRegionTableState.STATE; + } + + @Override + protected Procedure[] execute(final MasterProcedureEnv env) + throws ProcedureSuspendedException { + LOG.info("Finished execute"); + return null; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.serializeStateData(serializer); + serializer.serialize(ProtobufUtil.toProtoTableName(tableName)); + + + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) + throws IOException { + super.deserializeStateData(serializer); + tableName = ProtobufUtil + .toTableName(serializer.deserialize(HBaseProtos.TableName.class)); + + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + return super.acquireLock(env); + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + super.releaseLock(env); + } + } + + public enum DummyRegionTableState { + STATE + } + + +}