Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 c8bfc70d1 -> 169e3bafc


HBASE-21364 Procedure holds the lock should put to front of the queue after 
restart


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/169e3baf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/169e3baf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/169e3baf

Branch: refs/heads/branch-2.0
Commit: 169e3bafc889df2b299c9ecfdd57f8d6ff2060dd
Parents: c8bfc70
Author: Allan Yang <allan...@apache.org>
Authored: Wed Oct 24 10:46:09 2018 +0800
Committer: Allan Yang <allan...@apache.org>
Committed: Wed Oct 24 10:46:09 2018 +0800

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  |  10 +
 .../hadoop/hbase/procedure2/Procedure.java      |   4 +
 .../hbase/procedure2/ProcedureExecutor.java     |  11 +-
 .../hbase/procedure2/ProcedureScheduler.java    |  14 ++
 .../TestMasterProcedureSchedulerOnRestart.java  | 207 +++++++++++++++++++
 5 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/169e3baf/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 5645f89..7ab1329 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -86,6 +86,11 @@ public abstract class AbstractProcedureScheduler implements 
ProcedureScheduler {
   }
 
   @Override
+  public void addFront(final Procedure procedure, boolean notify) {
+    push(procedure, true, notify);
+  }
+
+  @Override
   public void addFront(Iterator<Procedure> procedureIterator) {
     schedLock();
     try {
@@ -109,6 +114,11 @@ public abstract class AbstractProcedureScheduler 
implements ProcedureScheduler {
     push(procedure, false, true);
   }
 
+  @Override
+  public void addBack(final Procedure procedure, boolean notify) {
+    push(procedure, false, notify);
+  }
+
   protected void push(final Procedure procedure, final boolean addFront, final 
boolean notify) {
     schedLock();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/169e3baf/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index a1391a5..a271d8f 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -720,6 +720,10 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure<TE
     this.lockedWhenLoading = true;
   }
 
+  public boolean isLockedWhenLoading() {
+    return lockedWhenLoading;
+  }
+
   // 
==============================================================================================
   //  Runtime state, updated every operation by the ProcedureExecutor
   //

http://git-wip-us.apache.org/repos/asf/hbase/blob/169e3baf/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 43663ef..a410bc9 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -652,8 +652,17 @@ public class ProcedureExecutor<TEnvironment> {
       if (!p.hasParent()) {
         sendProcedureLoadedNotification(p.getProcId());
       }
-      scheduler.addBack(p);
+      // If the procedure holds the lock, put the procedure in front
+      if (p.isLockedWhenLoading()) {
+        scheduler.addFront(p, false);
+      } else {
+        // if it was not, it can wait.
+        scheduler.addBack(p, false);
+      }
     });
+    // After all procedures put into the queue, signal the worker threads.
+    // Otherwise, there is a race condition. See HBASE-21364.
+    scheduler.signalAll();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/169e3baf/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index e7e1cdb..9489f52 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -53,6 +53,13 @@ public interface ProcedureScheduler {
   void addFront(Procedure proc);
 
   /**
+   * Inserts the specified element at the front of this queue.
+   * @param proc the Procedure to add
+   * @param notify whether need to notify worker
+   */
+  void addFront(Procedure proc, boolean notify);
+
+  /**
    * Inserts all elements in the iterator at the front of this queue.
    */
   void addFront(Iterator<Procedure> procedureIterator);
@@ -64,6 +71,13 @@ public interface ProcedureScheduler {
   void addBack(Procedure proc);
 
   /**
+   * Inserts the specified element at the end of this queue.
+   * @param proc the Procedure to add
+   * @param notify whether need to notify worker
+   */
+  void addBack(Procedure proc, boolean notify);
+
+  /**
    * The procedure can't run at the moment.
    * add it back to the queue, giving priority to someone else.
    * @param proc the Procedure to add back to the list

http://git-wip-us.apache.org/repos/asf/hbase/blob/169e3baf/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java
new file mode 100644
index 0000000..206d33d
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerOnRestart.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.DummyRegionProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestMasterProcedureSchedulerOnRestart {
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE = 
HBaseClassTestRule
+      .forClass(TestMasterProcedureSchedulerOnRestart.class);
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestMasterProcedureSchedulerOnRestart.class);
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+  private static final TableName tablename = 
TableName.valueOf("test:TestProcedureScheduler");
+  private static RegionInfo regionInfo = 
RegionInfoBuilder.newBuilder(tablename).build();
+
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+
+  private static WALProcedureStore procStore;
+
+  private static ProcedureExecutor<MasterProcedureEnv> procExecutor;
+
+  private static HBaseCommonTestingUtility htu;
+
+  private static MasterProcedureEnv masterProcedureEnv;
+
+
+  private static FileSystem fs;
+  private static Path testDir;
+  private static Path logDir;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    procExecutor = UTIL.getMiniHBaseCluster().getMaster()
+        .getMasterProcedureExecutor();
+  }
+
+  @Test
+  public void testScheduler() throws Exception {
+    // Add a region procedure, but stuck there
+    long regionProc = procExecutor.submitProcedure(new DummyRegionProcedure(
+        UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()
+            .getEnvironment(), regionInfo));
+    WALProcedureStore walProcedureStore = (WALProcedureStore) 
procExecutor.getStore();
+    // Roll the wal
+    walProcedureStore.rollWriterForTesting();
+    Thread.sleep(500);
+    // Submit a table procedure
+    procExecutor.submitProcedure(new DummyTableProcedure(
+        UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor()
+            .getEnvironment(), tablename));
+    // Restart the procExecutor
+    ProcedureTestingUtility.restart(procExecutor);
+    while (procExecutor.getProcedure(regionProc) == null) {
+      Thread.sleep(500);
+    }
+    DummyRegionProcedure dummyRegionProcedure = (DummyRegionProcedure) 
procExecutor
+        .getProcedure(regionProc);
+    // Resume the region procedure
+    dummyRegionProcedure.resume();
+    // The region procedure should finish normally
+    UTIL.waitFor(5000, () -> dummyRegionProcedure.isFinished());
+
+  }
+
+  public static class DummyTableProcedure extends
+      AbstractStateMachineTableProcedure<DummyRegionTableState> {
+
+    private TableName tableName;
+
+    public DummyTableProcedure() {
+      super();
+    }
+    public DummyTableProcedure(final MasterProcedureEnv env, TableName 
tableName) {
+      super(null, null);
+      this.tableName = tableName;
+    }
+
+    @Override
+    public TableName getTableName() {
+      return tableName;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return TableOperationType.CREATE;
+    }
+
+    @Override
+    protected Flow executeFromState(MasterProcedureEnv env,
+        DummyRegionTableState dummyRegionTableState)
+        throws ProcedureSuspendedException, ProcedureYieldException,
+        InterruptedException {
+      return null;
+    }
+
+    @Override
+    protected void rollbackState(MasterProcedureEnv env,
+        DummyRegionTableState dummyRegionTableState)
+        throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    protected DummyRegionTableState getState(int stateId) {
+      return DummyRegionTableState.STATE;
+    }
+
+    @Override
+    protected int getStateId(DummyRegionTableState dummyRegionTableState) {
+      return 0;
+    }
+
+    @Override
+    protected DummyRegionTableState getInitialState() {
+      return DummyRegionTableState.STATE;
+    }
+
+    @Override
+    protected Procedure[] execute(final MasterProcedureEnv env)
+        throws ProcedureSuspendedException {
+      LOG.info("Finished execute");
+      return null;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer)
+        throws IOException {
+      super.serializeStateData(serializer);
+      serializer.serialize(ProtobufUtil.toProtoTableName(tableName));
+
+
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer)
+        throws IOException {
+      super.deserializeStateData(serializer);
+      tableName = ProtobufUtil
+          .toTableName(serializer.deserialize(HBaseProtos.TableName.class));
+
+    }
+
+    @Override
+    protected LockState acquireLock(MasterProcedureEnv env) {
+      return super.acquireLock(env);
+    }
+
+    @Override
+    protected void releaseLock(MasterProcedureEnv env) {
+      super.releaseLock(env);
+    }
+  }
+
+  public enum DummyRegionTableState {
+    STATE
+  }
+
+
+}

Reply via email to