HBASE-16533 Procedure v2 - Extract chore from the executor
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aeecd4df Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aeecd4df Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aeecd4df Branch: refs/heads/branch-1 Commit: aeecd4df838f0dae8287c79036e45f6d6b634eb3 Parents: 9907a7e Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Tue Aug 30 18:40:51 2016 -0700 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Tue Aug 30 19:02:31 2016 -0700 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 62 +++++------ .../procedure2/ProcedureInMemoryChore.java | 69 ++++++++++++ .../procedure2/util/TimeoutBlockingQueue.java | 15 +++ .../procedure2/TestProcedureInMemoryChore.java | 110 +++++++++++++++++++ .../util/TestTimeoutBlockingQueue.java | 22 ++++ 5 files changed, 242 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index c195f65..ee70dd1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -127,7 +125,8 @@ public class ProcedureExecutor<TEnvironment> { * the master (e.g. master failover) so, if we delay a bit the real deletion of * the proc result the client will be able to get the result the next try. */ - private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> { + private static class CompletedProcedureCleaner<TEnvironment> + extends ProcedureInMemoryChore<TEnvironment> { private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class); private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval"; @@ -148,14 +147,15 @@ public class ProcedureExecutor<TEnvironment> { final Map<Long, ProcedureInfo> completedMap, final Map<NonceKey, Long> nonceKeysToProcIdsMap) { // set the timeout interval that triggers the periodic-procedure - setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); + super(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); this.completed = completedMap; this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap; this.store = store; this.conf = conf; } - public void periodicExecute(final TEnvironment env) { + @Override + protected void periodicExecute(final TEnvironment env) { if (completed.isEmpty()) { if (LOG.isTraceEnabled()) { LOG.trace("No completed procedures to cleanup."); @@ -189,31 +189,6 @@ public class ProcedureExecutor<TEnvironment> { } } } - - @Override - protected Procedure[] execute(final TEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - protected void rollback(final TEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean abort(final TEnvironment env) { - throw new UnsupportedOperationException(); - } - - @Override - public void serializeStateData(final OutputStream stream) { - throw new UnsupportedOperationException(); - } - - @Override - public void deserializeStateData(final InputStream stream) { - throw new UnsupportedOperationException(); - } } /** @@ -526,9 +501,8 @@ public class ProcedureExecutor<TEnvironment> { threads[i].start(); } - // Add completed cleaner - waitingTimeout.add( - new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); + // Add completed cleaner chore + addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); } public void stop() { @@ -617,6 +591,22 @@ public class ProcedureExecutor<TEnvironment> { } /** + * Add a chore procedure to the executor + * @param chore the chore to add + */ + public void addChore(final ProcedureInMemoryChore chore) { + waitingTimeout.add(chore); + } + + /** + * Remove a chore procedure from the executor + * @param chore the chore to remove + */ + public void removeChore(final ProcedureInMemoryChore chore) { + waitingTimeout.remove(chore); + } + + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. * @return the procedure id, that can be used to monitor the operation @@ -906,12 +896,12 @@ public class ProcedureExecutor<TEnvironment> { // will have the tracker saying everything is in the last log. // ---------------------------------------------------------------------------- - // The CompletedProcedureCleaner is a special case, and it acts as a chore. + // The ProcedureInMemoryChore is a special case, and it acts as a chore. // instead of bringing the Chore class in, we reuse this timeout thread for // this special case. - if (proc instanceof CompletedProcedureCleaner) { + if (proc instanceof ProcedureInMemoryChore) { try { - ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment()); + ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment()); } catch (Throwable e) { LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java new file mode 100644 index 0000000..bdced10 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Special procedure used as a chore. + * instead of bringing the Chore class in (dependencies reason), + * we reuse the executor timeout thread for this special case. + * + * The assumption is that procedure is used as hook to dispatch other procedures + * or trigger some cleanups. It does not store state in the ProcedureStore. + * this is just for in-memory chore executions. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEnvironment> { + protected ProcedureInMemoryChore(final int timeoutMsec) { + setTimeout(timeoutMsec); + } + + protected abstract void periodicExecute(final TEnvironment env); + + @Override + protected Procedure[] execute(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + protected void rollback(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public void serializeStateData(final OutputStream stream) { + throw new UnsupportedOperationException(); + } + + @Override + public void deserializeStateData(final InputStream stream) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java index f710ef4..fceabb1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/TimeoutBlockingQueue.java @@ -92,6 +92,20 @@ public class TimeoutBlockingQueue<E> { } } + public void remove(E e) { + lock.lock(); + try { + for (int i = 0; i < objects.length; ++i) { + if (objects[i] == e) { + objects[i] = null; + return; + } + } + } finally { + lock.unlock(); + } + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") public E poll() { lock.lock(); @@ -210,6 +224,7 @@ public class TimeoutBlockingQueue<E> { } private long getNanosTimeout(final E obj) { + if (obj == null) return 0; TimeUnit unit = timeoutRetriever.getTimeUnit(obj); long timeout = timeoutRetriever.getTimeout(obj); return unit.toNanos(timeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java new file mode 100644 index 0000000..32e3e8c --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureInMemoryChore { + private static final Log LOG = LogFactory.getLog(TestProcedureInMemoryChore.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private TestProcEnv procEnv; + private NoopProcedureStore procStore; + private ProcedureExecutor<TestProcEnv> procExecutor; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + + procEnv = new TestProcEnv(); + procStore = new NoopProcedureStore(); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + } + + @Test + public void testChoreAddAndRemove() throws Exception { + final int timeoutMSec = 50; + final int nCountDown = 5; + + // submit the chore and wait for execution + CountDownLatch latch = new CountDownLatch(nCountDown); + TestLatchChore chore = new TestLatchChore(timeoutMSec, latch); + procExecutor.addChore(chore); + latch.await(); + + // remove the chore and verify it is no longer executed + procExecutor.removeChore(chore); + latch = new CountDownLatch(nCountDown); + chore.setLatch(latch); + latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS); + LOG.info("chore latch count=" + latch.getCount()); + assertTrue(latch.getCount() > 0); + } + + public static class TestLatchChore extends ProcedureInMemoryChore<TestProcEnv> { + private CountDownLatch latch; + + public TestLatchChore(final int timeoutMSec, final CountDownLatch latch) { + super(timeoutMSec); + setLatch(latch); + } + + public void setLatch(final CountDownLatch latch) { + this.latch = latch; + } + + @Override + protected void periodicExecute(final TestProcEnv env) { + latch.countDown(); + } + } + + private static class TestProcEnv { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd4df/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java index 688e23a..5750650 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/util/TestTimeoutBlockingQueue.java @@ -133,4 +133,26 @@ public class TestTimeoutBlockingQueue { } } } + + @Test + public void testRemove() { + TimeoutBlockingQueue<TestObject> queue = + new TimeoutBlockingQueue<TestObject>(2, new TestObjectTimeoutRetriever()); + + TestObject[] objs = new TestObject[5]; + for (int i = 0; i < objs.length; ++i) { + objs[i] = new TestObject(0, i * 10); + queue.add(objs[i]); + } + queue.dump(); + + for (int i = 0; i < objs.length; i += 2) { + queue.remove(objs[i]); + } + + for (int i = 0; i < objs.length; ++i) { + TestObject x = queue.poll(); + assertEquals((i % 2) == 0 ? null : objs[i], x); + } + } }