Repository: oozie Updated Branches: refs/heads/master fe2da6e57 -> 13bfd4949
OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/13bfd494 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/13bfd494 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/13bfd494 Branch: refs/heads/master Commit: 13bfd49495d095f51d22d0f51042690837b701a4 Parents: fe2da6e Author: Peter Bacsko <pbac...@cloudera.com> Authored: Thu Sep 6 14:42:59 2018 +0200 Committer: Peter Bacsko <pbac...@cloudera.com> Committed: Thu Sep 6 14:42:59 2018 +0200 ---------------------------------------------------------------------- .../oozie/service/AsyncXCommandExecutor.java | 412 +++++++++++++++++ .../oozie/service/CallableQueueService.java | 145 ++++-- .../apache/oozie/util/PriorityDelayQueue.java | 26 ++ core/src/main/resources/oozie-default.xml | 25 + .../service/TestAsyncXCommandExecutor.java | 462 +++++++++++++++++++ .../oozie/service/TestCallableQueueService.java | 292 +++++++++++- release-log.txt | 1 + 7 files changed, 1308 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java new file mode 100644 index 0000000..b18a37a --- /dev/null +++ b/core/src/main/java/org/apache/oozie/service/AsyncXCommandExecutor.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.oozie.service.CallableQueueService.CallableWrapper; +import org.apache.oozie.util.NamedThreadFactory; +import org.apache.oozie.util.XCallable; +import org.apache.oozie.util.XLog; +import org.eclipse.jetty.util.ConcurrentHashSet; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +@SuppressWarnings("deprecation") +public class AsyncXCommandExecutor { + public static final int MIN_PRIORITY = 0; + public static final long ANTI_STARVATION_INTERVAL = 500; + private static XLog log = XLog.getLog(AsyncXCommandExecutor.class); + private final ThreadPoolExecutor executor; + private final ScheduledThreadPoolExecutor scheduledExecutor; + private final boolean needConcurrencyCheck; + private final CallableQueueService callableQueueService; + private final AtomicInteger activeCommands; + private final long maxActiveCommands; // equivalent of "queueSize" in CQS + private final long maxWait; + private final long maxPriority; + + private final BlockingQueue<CallableWrapper<?>> priorityBlockingQueue; + private final BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayWorkQueue; + private final ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType; + private long lastAntiStarvationCheck = 0; + + @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressFBWarnings( value = "SIC_INNER_SHOULD_BE_STATIC_ANON", + justification = "Unnecessary to refactor innen classes defined here") + public AsyncXCommandExecutor(int threads, + int delayedCallableThreads, + boolean needConcurrencyCheck, + CallableQueueService callableAccess, + long maxActiveCommands, + long maxWait, + int priorities) { + + priorityBlockingQueue = new PriorityBlockingQueue<CallableWrapper<?>>(100, new PriorityComparator()); + + executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, + (BlockingQueue) priorityBlockingQueue, + new NamedThreadFactory("CallableQueue")) { + protected void beforeExecute(Thread t, Runnable r) { + XLog.Info.get().clear(); + } + + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + return (RunnableFuture<T>)callable; + } + }; + + this.scheduledExecutor = new ScheduledThreadPoolExecutor(delayedCallableThreads, + new NamedThreadFactory("ScheduledCallable")) { + protected <V> RunnableScheduledFuture<V> decorateTask( + Runnable runnable, RunnableScheduledFuture<V> task) { + + AccessibleRunnableScheduledFuture<V> arsf = + new AccessibleRunnableScheduledFuture<>(task, runnable); + + return arsf; + } + }; + + this.delayWorkQueue = (BlockingQueue) scheduledExecutor.getQueue(); + this.needConcurrencyCheck = needConcurrencyCheck; + this.callableQueueService = callableAccess; + this.maxActiveCommands = maxActiveCommands; + this.maxWait = maxWait; + this.activeCommands = new AtomicInteger(0); + this.pendingCommandsPerType = new ConcurrentHashMap<>(); + Preconditions.checkArgument(priorities > 0, "Number of priorities must be >0"); + this.maxPriority = priorities - 1; + } + + @VisibleForTesting + AsyncXCommandExecutor(boolean needConcurrencyCheck, + CallableQueueService callableAccess, + long maxActiveCommands, + ThreadPoolExecutor executor, + ScheduledThreadPoolExecutor scheduledExecutor, + PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue, + BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue, + ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType, + AtomicInteger activeCommands, + long maxWait, + long priorities) { + + this.priorityBlockingQueue = priorityBlockingQueue; + this.delayWorkQueue = delayQueue; + this.pendingCommandsPerType = pendingCommandsPerType; + this.executor = executor; + this.scheduledExecutor = scheduledExecutor; + this.needConcurrencyCheck = needConcurrencyCheck; + this.callableQueueService = callableAccess; + this.maxActiveCommands = maxActiveCommands; + this.activeCommands = activeCommands; + this.maxWait = maxWait; + this.maxPriority = priorities - 1; + } + + public synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) { + if (!ignoreQueueSize && activeCommands.get() >= maxActiveCommands) { + log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); + return false; + } + + if (wrapper.filterDuplicates()) { + wrapper.addToUniqueCallables(); + + int priority = wrapper.getPriority(); + long initialDelay = wrapper.getInitialDelay(); + + try { + if (priority > maxPriority || priority < MIN_PRIORITY) { + throw new IllegalArgumentException("priority out of range: " + priority); + } + + if (initialDelay == 0) { + executor.execute(wrapper); + } else { + ScheduledXCallable scheduledXCallable = new ScheduledXCallable(wrapper); + long schedDelay = wrapper.getDelay(TimeUnit.MILLISECONDS); + scheduledExecutor.schedule(scheduledXCallable, + schedDelay, TimeUnit.MILLISECONDS); + } + + activeCommands.incrementAndGet(); + } catch (Throwable ree) { + wrapper.removeFromUniqueCallables(); + throw new RuntimeException(ree); + } + } + + return true; + } + + public void handleConcurrencyExceeded(CallableWrapper<?> command) { + String type = command.getElement().getType(); + + Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type); + if (commandsForType == null) { + commandsForType = new ConcurrentHashSet<>(); + Set<CallableWrapper<?>> oldCommandForType; + oldCommandForType = pendingCommandsPerType.putIfAbsent(type, commandsForType); + + if (oldCommandForType != null) { + // a different thread was faster + commandsForType = oldCommandForType; + } + } + + commandsForType.add(command); + } + + public void checkMaxConcurrency(String type) { + Set<CallableWrapper<?>> commandsForType = pendingCommandsPerType.get(type); + + if (commandsForType != null) { + // Only a single thread should be doing stuff here! Reason: concurrent executions might + // submit an eligible XCallable multiple times, which must be avoided. + synchronized (commandsForType) { + boolean doAntiStarvation = false; + int priorityModified = 0; + long now = System.currentTimeMillis(); + if (now - lastAntiStarvationCheck > ANTI_STARVATION_INTERVAL) { + doAntiStarvation = true; + } + + for (Iterator<CallableWrapper<?>> itr = commandsForType.iterator(); itr.hasNext();) { + CallableWrapper<?> command = itr.next(); + + // Anti-starvation logic: try to promote callables that have been waiting for too long + int currentPrio = command.getPriority(); + if (doAntiStarvation + && command.getDelay(TimeUnit.MILLISECONDS) < -maxWait + && currentPrio < maxPriority) { + command.setDelay(0, TimeUnit.MILLISECONDS); + command.setPriority(++currentPrio); + priorityModified++; + } + + if (callableQueueService.canSubmitCallable(command.getElement())) { + if (activeCommands.get() >= maxActiveCommands) { + log.warn("queue full, ignoring queuing for [{0}]", command.getElement().getKey()); + activeCommands.decrementAndGet(); + } else { + executor.execute(command); + } + + itr.remove(); + } + } + + if (doAntiStarvation) { + lastAntiStarvationCheck = System.currentTimeMillis(); + } + + if (priorityModified > 0) { + log.debug("Anti-starvation: handled [{0}] elements", priorityModified); + } + } + } + } + + public void commandFinished() { + // Note: this is to track the number of elements. Otherwise we'd have to combine the size of + // two queues + a list. + activeCommands.decrementAndGet(); + } + + public ThreadPoolExecutor getExecutorService() { + return executor; + } + + public void shutdown() { + try { + shutdownExecutor(executor, "executor"); + shutdownExecutor(scheduledExecutor, "scheduled executor"); + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for executor shutdown"); + } + } + + public List<String> getQueueDump() { + List<CallableWrapper<?>> copyOfPending = new ArrayList<>(100); + List<String> queueDump = new ArrayList<>(100); + + // Safe to iterate + for (Map.Entry<String, Set<CallableWrapper<?>>> entry : pendingCommandsPerType.entrySet()) { + Set<CallableWrapper<?>> pendingCommandsPerType = entry.getValue(); + copyOfPending.addAll(pendingCommandsPerType); + } + + // Safe to iterate + for (final CallableWrapper<?> wrapper : priorityBlockingQueue) { + queueDump.add(wrapper.toString()); + } + + // Safe to iterate + for (final AccessibleRunnableScheduledFuture<ScheduledXCallable> future : delayWorkQueue) { + ScheduledXCallable delayedXCallable = (ScheduledXCallable) future.getTask(); + queueDump.add(delayedXCallable.getCallableWrapper().toString()); + } + + for (final CallableWrapper<?> wrapper : copyOfPending) { + queueDump.add(wrapper.toString()); + } + + return queueDump; + } + + public int getSize() { + return activeCommands.get(); + } + + public class ScheduledXCallable implements Runnable { + private CallableWrapper<?> target; + + public ScheduledXCallable(CallableWrapper<?> target) { + this.target = target; + } + + @Override + public void run() { + if (needConcurrencyCheck && !callableQueueService.canSubmitCallable(target.getElement())) { + XCallable<?> callable = target.getElement(); + handleConcurrencyExceeded(target); + + // need this to deal with a special race condition: we detect that concurrency + // exceeded, but an XCommand (or more!) with the same type just happens to finish. If that + // happens, this callable might never get scheduled again (or much later), so we have to guard + // against this condition. + checkMaxConcurrency(callable.getType()); + } else { + executor.execute(target); + } + } + + public CallableWrapper<?> getCallableWrapper() { + return target; + } + } + + @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE", + justification = "PriorityBlockingQueue which uses this comparator will never be serialized") + public static class PriorityComparator implements Comparator<CallableWrapper<?>> { + @Override + public int compare(CallableWrapper<?> o1, CallableWrapper<?> o2) { + return Integer.compare(o2.getPriority(), o1.getPriority()); + } + } + + // We have to use this so that scheduled elements in the DelayWorkQueue are accessible + @SuppressFBWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", + justification = "This class has a natural ordering (expiration) which is inconsistent with equals") + public static class AccessibleRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> { + private final Runnable task; + private RunnableScheduledFuture<V> originalFuture; + + public AccessibleRunnableScheduledFuture(RunnableScheduledFuture<V> originalFuture, + Runnable task) { + this.task = task; + this.originalFuture = originalFuture; + } + + @Override + public void run() { + originalFuture.run(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return originalFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return originalFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return originalFuture.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return originalFuture.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return originalFuture.get(timeout, unit); + } + + @Override + public long getDelay(TimeUnit unit) { + return originalFuture.getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return originalFuture.compareTo(o); + } + + @Override + public boolean isPeriodic() { + return originalFuture.isPeriodic(); + } + + public Runnable getTask() { + return task; + } + } + + private void shutdownExecutor(ExecutorService executor, String name) throws InterruptedException { + long limit = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30); + executor.shutdown(); + while (!executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { + log.info("Waiting for [{0}] to shutdown", name); + if (System.currentTimeMillis() > limit) { + log.warn("Gave up, continuing without waiting for executor to shutdown"); + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/service/CallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java index ef8d58d..dc9a099 100644 --- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java +++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java @@ -72,6 +72,7 @@ import com.google.common.collect.ImmutableSet; * execution of Commands via a ThreadPool. Sets up a Delayed Queue to handle actions which will be ready for execution * sometime in the future. */ +@SuppressWarnings("deprecation") public class CallableQueueService implements Service, Instrumentable { private static final String INSTRUMENTATION_GROUP = "callablequeue"; private static final String INSTR_IN_QUEUE_TIME_TIMER = "time.in.queue"; @@ -85,14 +86,17 @@ public class CallableQueueService implements Service, Instrumentable { public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size"; public static final String CONF_THREADS = CONF_PREFIX + "threads"; + public static final String CONF_OLDIMPL = CONF_PREFIX + "queue.oldImpl"; + public static final String CONF_DELAYED_CALLABLE_THREADS = CONF_PREFIX + "delayedcallable.threads"; public static final String CONF_CALLABLE_CONCURRENCY = CONF_PREFIX + "callable.concurrency"; public static final String CONF_CALLABLE_NEXT_ELIGIBLE = CONF_PREFIX + "callable.next.eligible"; public static final String CONF_CALLABLE_INTERRUPT_TYPES = CONF_PREFIX + "InterruptTypes"; public static final String CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE = CONF_PREFIX + "InterruptMapMaxSize"; public static final int CONCURRENCY_DELAY = 500; - public static final int SAFE_MODE_DELAY = 60000; + public static final int MAX_CALLABLE_WAITTIME_MS = 30_000; + public static final int PRIORITIES = 3; private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); @@ -131,6 +135,11 @@ public class CallableQueueService implements Service, Instrumentable { counter.decrementAndGet(); } } + + if (!oldImpl) { + asyncXCommandExecutor.commandFinished(); + asyncXCommandExecutor.checkMaxConcurrency(callable.getType()); + } } private boolean callableReachMaxConcurrency(XCallable<?> callable) { @@ -146,6 +155,21 @@ public class CallableQueueService implements Service, Instrumentable { } } + public boolean canSubmitCallable(XCallable<?> callable) { + synchronized (activeCallables) { + AtomicInteger counter = activeCallables.get(callable.getType()); + if (counter == null) { + counter = new AtomicInteger(1); + activeCallables.put(callable.getType(), counter); + return true; + } + else { + int i = counter.get(); + return i <= maxCallableConcurrency; + } + } + } + // Callables are wrapped with the this wrapper for execution, for logging // and instrumentation. // The wrapper implements Runnable and Comparable to be able to work with an @@ -414,9 +438,11 @@ public class CallableQueueService implements Service, Instrumentable { private XLog log = XLog.getLog(getClass()); private int queueSize; - private PriorityDelayQueue<CallableWrapper> queue; + private PriorityDelayQueue<CallableWrapper<?>> queue; private ThreadPoolExecutor executor; private Instrumentation instrumentation; + private boolean oldImpl = false; + private AsyncXCommandExecutor asyncXCommandExecutor; /** * Convenience method for instrumentation counters. @@ -458,7 +484,10 @@ public class CallableQueueService implements Service, Instrumentable { interruptTypes = ImmutableSet.copyOf(interruptTypes); if (!callableNextEligible) { - queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { + queue = new PriorityDelayQueue<CallableWrapper<?>>(PRIORITIES, + MAX_CALLABLE_WAITTIME_MS, + TimeUnit.MILLISECONDS, + queueSize) { @Override protected void debug(String msgTemplate, Object... msgArgs) { log.trace(msgTemplate, msgArgs); @@ -471,7 +500,10 @@ public class CallableQueueService implements Service, Instrumentable { // which has not yet reach max concurrency.Overrided method // 'eligibleToPoll' to check if the // element of this queue has reached the maximum concurrency. - queue = new PollablePriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { + queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES, + MAX_CALLABLE_WAITTIME_MS, + TimeUnit.MILLISECONDS, + queueSize) { @Override protected void debug(String msgTemplate, Object... msgArgs) { log.trace(msgTemplate, msgArgs); @@ -493,28 +525,45 @@ public class CallableQueueService implements Service, Instrumentable { interruptMapMaxSize = ConfigurationService.getInt(conf, CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE); + oldImpl = ConfigurationService.getBoolean(CONF_OLDIMPL, false); + log.info("Using old queue implementation: [{0}]", oldImpl); + + if (oldImpl) { + executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, + new NamedThreadFactory("CallableQueue")) { + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t,r); + XLog.Info.get().clear(); + } + protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { + return (RunnableFuture<T>)callable; + } + }; + } else { + int delayedCallableThreads = ConfigurationService.getInt(CONF_DELAYED_CALLABLE_THREADS, 1); + + asyncXCommandExecutor = new AsyncXCommandExecutor(threads, + delayedCallableThreads, + callableNextEligible, + this, + queueSize, + MAX_CALLABLE_WAITTIME_MS, + PRIORITIES); + + executor = asyncXCommandExecutor.getExecutorService(); + } + // IMPORTANT: The ThreadPoolExecutor does not always the execute // commands out of the queue, there are // certain conditions where commands are pushed directly to a thread. // As we are using a queue with DELAYED semantics (i.e. execute the // command in 5 mins) we need to make // sure that the commands are always pushed to the queue. - // To achieve this (by looking a the ThreadPoolExecutor.execute() + // To achieve this (by looking at the ThreadPoolExecutor.execute() // implementation, we are making the pool // minimum size equals to the maximum size (thus threads are keep always // running) and we are warming up // all those threads (the for loop that runs dummy runnables). - executor = new ThreadPoolExecutor(threads, threads, 10, TimeUnit.SECONDS, (BlockingQueue) queue, - new NamedThreadFactory("CallableQueue")) { - protected void beforeExecute(Thread t, Runnable r) { - super.beforeExecute(t,r); - XLog.Info.get().clear(); - } - protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return (RunnableFuture<T>)callable; - } - }; - for (int i = 0; i < threads; i++) { executor.execute(new Runnable() { public void run() { @@ -547,6 +596,10 @@ public class CallableQueueService implements Service, Instrumentable { break; } } + + if (!oldImpl) { + asyncXCommandExecutor.shutdown(); + } } catch (InterruptedException ex) { log.warn(ex); @@ -567,29 +620,34 @@ public class CallableQueueService implements Service, Instrumentable { * @return int size of queue */ public synchronized int queueSize() { - return queue.size(); + return oldImpl ? queue.size() : asyncXCommandExecutor.getSize(); } - private synchronized boolean queue(CallableWrapper wrapper, boolean ignoreQueueSize) { - if (!ignoreQueueSize && queue.size() >= queueSize) { - log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); - return false; - } - if (!executor.isShutdown()) { - if (wrapper.filterDuplicates()) { - wrapper.addToUniqueCallables(); - try { - executor.execute(wrapper); - } - catch (Throwable ree) { - wrapper.removeFromUniqueCallables(); - throw new RuntimeException(ree); + private synchronized boolean queue(CallableWrapper<?> wrapper, boolean ignoreQueueSize) { + if (oldImpl) { + if (!ignoreQueueSize && queue.size() >= queueSize) { + log.warn("queue full, ignoring queuing for [{0}]", wrapper.getElement().getKey()); + return false; + } + if (!executor.isShutdown()) { + if (wrapper.filterDuplicates()) { + wrapper.addToUniqueCallables(); + try { + executor.execute(wrapper); + } + catch (Throwable ree) { + wrapper.removeFromUniqueCallables(); + throw new RuntimeException(ree); + } } } + else { + log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); + } + } else { + asyncXCommandExecutor.queue(wrapper, ignoreQueueSize); } - else { - log.warn("Executor shutting down, ignoring queueing of [{0}]", wrapper.getElement().getKey()); - } + return true; } @@ -637,7 +695,7 @@ public class CallableQueueService implements Service, Instrumentable { } else { checkInterruptTypes(callable); - queued = queue(new CallableWrapper(callable, delay), false); + queued = queue(new CallableWrapper<>(callable, delay), false); if (queued) { incrCounter(INSTR_QUEUED_COUNTER, 1); } @@ -762,14 +820,18 @@ public class CallableQueueService implements Service, Instrumentable { * @return the list of string that representing each CallableWrapper */ public List<String> getQueueDump() { - List<String> list = new ArrayList<String>(); - for (QueueElement<CallableWrapper> qe : queue) { - if (qe.toString() == null) { - continue; + if (oldImpl) { + List<String> list = new ArrayList<String>(); + for (QueueElement<CallableWrapper<?>> qe : queue) { + if (qe.toString() == null) { + continue; + } + list.add(qe.toString()); } - list.add(qe.toString()); + return list; + } else { + return asyncXCommandExecutor.getQueueDump(); } - return list; } /** @@ -794,5 +856,4 @@ public class CallableQueueService implements Service, Instrumentable { public Set<String> getInterruptTypes() { return interruptTypes; } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java index 75c2069..365f918 100644 --- a/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java +++ b/core/src/main/java/org/apache/oozie/util/PriorityDelayQueue.java @@ -51,7 +51,10 @@ import java.util.concurrent.locks.ReentrantLock; * <p> * This class does not use a separate thread for anti-starvation check, instead, the check is performed on polling and * seeking operations. This check is performed, the most every 1/2 second. + * + * @deprecated this implementation will be removed in the future and AsyncCommandExecutor will be used. */ +@Deprecated public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.QueueElement<E>> implements BlockingQueue<PriorityDelayQueue.QueueElement<E>> { @@ -65,6 +68,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu private int priority; private long baseTime; boolean inQueue; + private long initialDelay; /** * Create an Element wrapper. @@ -88,6 +92,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu this.element = element; this.priority = priority; setDelay(delay, unit); + this.initialDelay = delay; } /** @@ -100,6 +105,15 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu } /** + * Sets the priority of the element. + * + * @param priority the priority of the element + */ + public void setPriority(int priority) { + this.priority = priority; + } + + /** * Return the priority of the element. * * @return the priority of the element. @@ -116,6 +130,7 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu */ public void setDelay(long delay, TimeUnit unit) { baseTime = System.currentTimeMillis() + unit.toMillis(delay); + initialDelay = delay; } /** @@ -130,6 +145,17 @@ public class PriorityDelayQueue<E> extends AbstractQueue<PriorityDelayQueue.Queu } /** + * Returns the original delay of the element. As time goes on, this value remains static, + * as opposed to getDelay(), where the delay depends on how much time has passed since the + * creation. + * + * @return the initial delay of this element in milliseconds. + */ + public long getInitialDelay() { + return initialDelay; + } + + /** * Compare the age of this wrapper element with another. The priority is not used for the comparision. * * @param o the other wrapper element to compare with. http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index c354f02..c3bcdfc 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -504,6 +504,31 @@ </property> <property> + <name>oozie.service.CallableQueueService.delayedcallable.threads</name> + <value>1</value> + <description> + The number of threads where delayed tasks are executed. Upon expiration, the tasks are immediately + inserted into the main queue to properly handle priorities. This means that no actual business logic + is executed in this thread pool, so under normal circumstances, this value can be set to a low number. + + Note that this property is completely unrelated to oozie.service.SchedulerService.threads which + tells how many scheduled background tasks can run in parallel at the same time (like PurgeService, + StatusTransitService, etc). + </description> + </property> + + <property> + <name>oozie.service.CallableQueueService.queue.oldImpl</name> + <value>false</value> + <description> + If set to false, then CallableQueueService will use a more performant, less CPU-intensive + queuing mechanism to execute asynchronous tasks internally. The old implementation generates + noticeable CPU load even if Oozie is completely idle, especially when oozie.service.CallableQueueService.threads + set to a large number. The previous queuing mechanism is kept as a fallback option. + </description> + </property> + + <property> <name>oozie.service.CallableQueueService.callable.concurrency</name> <value>3</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java new file mode 100644 index 0000000..f9ec4d6 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture; +import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable; +import org.apache.oozie.service.CallableQueueService.CallableWrapper; +import org.apache.oozie.util.XCallable; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.Sets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.times; +import static org.mockito.Matchers.same; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; + +@RunWith(MockitoJUnitRunner.class) +@SuppressWarnings("deprecation") +public class TestAsyncXCommandExecutor { + private static final String DEFAULT_TYPE = "test"; + private static final int DEFAULT_MAX_ACTIVE_COMMANDS = 5; + private static final boolean DEFAULT_ENABLE_CONCURRENCY_CHECK = true; + private static final long DEFAULT_MAXWAIT = 30_000; + private static final int TEST_PRIORITIES = 5; + private static final int MAX_PRIORITY = TEST_PRIORITIES - 1; + + @Mock + private ThreadPoolExecutor executor; + + @Mock + private ScheduledThreadPoolExecutor scheduledExecutor; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private CallableWrapper<?> callableWrapper; + + @Mock + private CallableQueueService callableQueueService; + + private PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue; + private BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue; + private ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType; + private AtomicInteger activeCommands; + private AsyncXCommandExecutor asyncExecutor; + + @Before + public void setup() { + activeCommands = new AtomicInteger(0); + priorityBlockingQueue = new PriorityBlockingQueue<>(); + pendingCommandsPerType = new ConcurrentHashMap<>(); + delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT); + when(callableWrapper.filterDuplicates()).thenReturn(true); + when(callableWrapper.getElement().getKey()).thenReturn("key"); + when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE); + } + + @Test + public void testSubmitCallableWithNoDelay() { + boolean result = asyncExecutor.queue(callableWrapper, false); + + verify(executor).execute(same(callableWrapper)); + verifyZeroInteractions(scheduledExecutor); + assertEquals("Active commands", 1, asyncExecutor.getSize()); + assertTrue("Queuing result", result); + } + + @Test + public void testSubmitCallableWithDelay() { + when(callableWrapper.getInitialDelay()).thenReturn(111L); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(222L); + + boolean result = asyncExecutor.queue(callableWrapper, false); + + verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(222L), eq(TimeUnit.MILLISECONDS)); + verifyZeroInteractions(executor); + assertEquals("Active commands", 1, asyncExecutor.getSize()); + assertTrue("Queuing result", result); + } + + @Test + public void testSubmissionSuccessfulAfterDelay() { + when(callableWrapper.getInitialDelay()).thenReturn(100L); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L); + when(callableQueueService.canSubmitCallable(any(XCallable.class))).thenReturn(true); + configureMockScheduler(); + + asyncExecutor.queue(callableWrapper, false); + + verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L), + eq(TimeUnit.MILLISECONDS)); + verify(executor).execute(callableWrapper); + } + + @Test + public void testSubmissionFailsAfterDelay() { + when(callableWrapper.getInitialDelay()).thenReturn(100L); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L); + configureMockScheduler(); + + asyncExecutor.queue(callableWrapper, false); + + verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L), + eq(TimeUnit.MILLISECONDS)); + verifyZeroInteractions(executor); + } + + @Test + public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() { + asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT); + when(callableWrapper.getInitialDelay()).thenReturn(100L); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L); + XCallable<?> wrappedCommand = mock(XCallable.class); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(wrappedCommand); + configureMockScheduler(); + + asyncExecutor.queue(callableWrapper, false); + + verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L), + eq(TimeUnit.MILLISECONDS)); + verify(executor).execute(eq(callableWrapper)); + } + + @Test + public void testCannotSubmitDueToFiltering() { + when(callableWrapper.filterDuplicates()).thenReturn(false); + + boolean result = asyncExecutor.queue(callableWrapper, false); + + verifyZeroInteractions(scheduledExecutor); + verifyZeroInteractions(executor); + assertEquals("Active commands", 0, asyncExecutor.getSize()); + assertTrue("Queuing result", result); + } + + @Test + public void testExceptionThrownDuringSubmission() { + doThrow(new RuntimeException()).when(executor).execute(any(Runnable.class)); + + boolean exceptionThrown = false; + try { + asyncExecutor.queue(callableWrapper, false); + } catch (RuntimeException e) { + exceptionThrown = true; + } + + assertTrue("Exception was not thrown", exceptionThrown); + verify(callableWrapper).removeFromUniqueCallables(); + verifyZeroInteractions(scheduledExecutor); + } + + @Test + public void testSubmitWithNegativePriority() { + testIllegalPriority(-1); + } + + @Test + public void testSubmitWithTooHighPriority() { + testIllegalPriority(MAX_PRIORITY + 1); + } + + @Test + public void testQueueSizeWhenCommandIsFinished() { + CallableWrapper<?> delayedCommand = mock(CallableWrapper.class); + when(delayedCommand.getInitialDelay()).thenReturn(100L); + when(delayedCommand.filterDuplicates()).thenReturn(true); + + asyncExecutor.queue(callableWrapper, false); + asyncExecutor.queue(delayedCommand, false); + int sizeAfterQueue = asyncExecutor.getSize(); + asyncExecutor.commandFinished(); + asyncExecutor.commandFinished(); + + assertEquals("Size after queue", 2, sizeAfterQueue); + assertEquals("Active commands", 0, asyncExecutor.getSize()); + } + + @Test + public void testQueueSizeWhenQueueIsFullDuringMaxConcurrencyCheck() { + XCallable<?> callable = mock(XCallable.class); + when(callable.getType()).thenReturn(DEFAULT_TYPE); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable); + when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true); + asyncExecutor.handleConcurrencyExceeded(callableWrapper); + activeCommands.set(20); + + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + assertEquals("Active commands", 19, activeCommands.get()); + } + + @Test + public void testSubmissionWhenQueueIsFull() { + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT); + callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); + when(callableWrapper.filterDuplicates()).thenReturn(true); + when(callableWrapper.getElement().getKey()).thenReturn("key"); + + asyncExecutor.queue(callableWrapper, false); + asyncExecutor.queue(callableWrapper, false); + boolean finalResult = asyncExecutor.queue(callableWrapper, false); + + assertFalse("Last submission shouldn't have succeeded", finalResult); + verify(executor, times(2)).execute(same(callableWrapper)); + } + + @Test + public void testSubmissionWhenQueueSizeIsIgnored() { + asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT); + callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS); + when(callableWrapper.filterDuplicates()).thenReturn(true); + when(callableWrapper.getElement().getKey()).thenReturn("key"); + + asyncExecutor.queue(callableWrapper, false); + asyncExecutor.queue(callableWrapper, false); + boolean finalResult = asyncExecutor.queue(callableWrapper, true); + + assertTrue("Last submission should have succeeded", finalResult); + verify(executor, times(3)).execute(same(callableWrapper)); + } + + @Test + public void testPendingCommandSubmission() { + XCallable<?> callable = mock(XCallable.class); + when(callable.getType()).thenReturn(DEFAULT_TYPE); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable); + when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true); + + asyncExecutor.handleConcurrencyExceeded(callableWrapper); + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verify(executor).execute(eq(callableWrapper)); + assertEquals("Number of pending commands", 1, pendingCommandsPerType.size()); + Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE); + assertNotNull("List of pending commands doesn't exist", pendingCommandsList); + assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size()); + } + + @Test + public void testPendingCommandsWithSameType() { + XCallable<?> callable = mock(XCallable.class); + when(callable.getType()).thenReturn(DEFAULT_TYPE); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable); + + XCallable<?> secondCallable = mock(XCallable.class); + when(secondCallable.getType()).thenReturn(DEFAULT_TYPE); + CallableWrapper<?> secondWrapper = mock(CallableWrapper.class); + Mockito.<XCallable<?>>when(secondWrapper.getElement()).thenReturn(secondCallable); + + asyncExecutor.handleConcurrencyExceeded(callableWrapper); + asyncExecutor.handleConcurrencyExceeded(secondWrapper); + + assertEquals("Number of pending commands", 1, pendingCommandsPerType.size()); + Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE); + assertNotNull("List of pending commands doesn't exist", pendingCommandsList); + assertEquals("List of pending commands", 2, pendingCommandsList.size()); + } + + @Test + public void testPendingCommandSubmissionWhenQueueIsFull() { + XCallable<?> callable = mock(XCallable.class); + when(callable.getType()).thenReturn(DEFAULT_TYPE); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable); + + when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true); + + activeCommands.set(10); + asyncExecutor.handleConcurrencyExceeded(callableWrapper); + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verifyZeroInteractions(executor); + assertEquals("Number of pending commands", 1, pendingCommandsPerType.size()); + Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE); + assertNotNull("List of pending commands doesn't exist", pendingCommandsList); + assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size()); + } + + @Test + public void testPendingCommandSubmissionWhenMaxConcurrencyReached() { + XCallable<?> callable = mock(XCallable.class); + when(callable.getType()).thenReturn(DEFAULT_TYPE); + Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable); + when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(false); + + asyncExecutor.handleConcurrencyExceeded(callableWrapper); + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verifyZeroInteractions(executor); + assertEquals("Number of pending commands", 1, pendingCommandsPerType.size()); + Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE); + assertNotNull("List of pending commands doesn't exist", pendingCommandsList); + assertEquals("List of pending commands list should not be empty", 1, pendingCommandsList.size()); + } + + @Test + public void testQueueDump() { + CallableWrapper<?> pendingCallable = mock(CallableWrapper.class); + CallableWrapper<?> waitingCallable = mock(CallableWrapper.class); + ScheduledXCallable delayedXCallable = mock(ScheduledXCallable.class); + @SuppressWarnings("unchecked") + AccessibleRunnableScheduledFuture<ScheduledXCallable> asrf = mock(AccessibleRunnableScheduledFuture.class); + Mockito.<CallableWrapper<?>>when(delayedXCallable.getCallableWrapper()).thenReturn(waitingCallable); + when(asrf.getTask()).thenReturn(delayedXCallable); + when(pendingCallable.toString()).thenReturn("pendingCallable"); + when(waitingCallable.toString()).thenReturn("waitingCallable"); + when(callableWrapper.toString()).thenReturn("callableWrapper"); + + priorityBlockingQueue.add(callableWrapper); + delayQueue.add(asrf); + pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(pendingCallable)); + + List<String> queueDump = asyncExecutor.getQueueDump(); + assertEquals("Size", 3, queueDump.size()); + assertTrue("PendingCallable not found", queueDump.contains("pendingCallable")); + assertTrue("WaitingCallable not found", queueDump.contains("waitingCallable")); + assertTrue("CallableWrapper not found", queueDump.contains("callableWrapper")); + } + + @Test + public void testAntiStarvationWhenDelayIsAboveMaxWait() { + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L); + when(callableWrapper.getPriority()).thenReturn(0); + pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); + + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verify(callableWrapper).setPriority(1); + verify(callableWrapper).setDelay(eq(0L), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testAntiStarvationWhenDelayIsBelowMaxWait() { + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-200L); + when(callableWrapper.getPriority()).thenReturn(0); + pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); + + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verify(callableWrapper, never()).setPriority(anyInt()); + verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testAntiStarvationWhenPriorityIsHighest() { + asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500); + when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L); + when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY); + pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper)); + + asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE); + + verify(callableWrapper, never()).setPriority(anyInt()); + verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class)); + } + + @Test + public void testShutDown() throws InterruptedException { + when(executor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); + when(scheduledExecutor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); + asyncExecutor.shutdown(); + + verify(executor).shutdown(); + verify(executor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS)); + verify(scheduledExecutor).shutdown(); + verify(scheduledExecutor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS)); + } + + private void testIllegalPriority(int prio) { + when(callableWrapper.getPriority()).thenReturn(prio); + + boolean exceptionThrown = false; + Throwable cause = null; + try { + asyncExecutor.queue(callableWrapper, false); + } catch (RuntimeException e) { + exceptionThrown = true; + cause = e.getCause(); + } + + assertTrue("Exception was not thrown", exceptionThrown); + verifyZeroInteractions(scheduledExecutor); + verifyZeroInteractions(executor); + assertTrue("Illegal exception", cause instanceof IllegalArgumentException); + verify(callableWrapper).removeFromUniqueCallables(); + } + + private void configureMockScheduler() { + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ScheduledXCallable target = (ScheduledXCallable) invocation.getArguments()[0]; + target.run(); + return null; + } + }).when(scheduledExecutor).schedule(any(ScheduledXCallable.class), any(Long.class), + any(TimeUnit.class)); + } + + private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables, + long maxWait) { + return new AsyncXCommandExecutor(needMaxConcurrencyCheck, + callableQueueService, + maxActiveCallables, + executor, + scheduledExecutor, + priorityBlockingQueue, + delayQueue, + pendingCommandsPerType, + activeCommands, + DEFAULT_MAXWAIT, + TEST_PRIORITIES); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java index 9c2a11d..5d546ff 100644 --- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java +++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java @@ -25,16 +25,30 @@ import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.XCommand; import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.XCallable; +import org.apache.oozie.util.XLog; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class TestCallableQueueService extends XTestCase { static AtomicLong EXEC_ORDER = new AtomicLong(); + private static XLog log = XLog.getLog(TestCallableQueueService.class); + private AtomicLong counter = new AtomicLong(); + private CountDownLatch finished = new CountDownLatch(1); public static class MyCallable implements XCallable<Void> { String type; @@ -93,6 +107,8 @@ public class TestCallableQueueService extends XTestCase { StringBuilder sb = new StringBuilder(); sb.append("Type:").append(getType()); sb.append(",Priority:").append(getPriority()); + sb.append(",Key:").append(getKey()); + sb.append(",Wait:").append(wait); return sb.toString(); } @@ -409,20 +425,19 @@ public class TestCallableQueueService extends XTestCase { queueservice.queue(c, 10); } - float originalRatio = XTestCase.WAITFOR_RATIO; - try{ - XTestCase.WAITFOR_RATIO = 1; - waitFor(2000, new Predicate() { - public boolean evaluate() throws Exception { - return queueservice.queueSize() == 0; + waitFor(3000, new Predicate() { + public boolean evaluate() throws Exception { + boolean completed = true; + + for (MyCallable callable : callables) { + completed &= (callable.executed != 0); } - }); - } - finally { - XTestCase.WAITFOR_RATIO = originalRatio; - } - System.out.println("Callable Queue Size :" + queueservice.queueSize()); + completed &= (callableOther.executed != 0); + + return completed; + } + }); long last = Long.MIN_VALUE; for (MyCallable c : callables) { @@ -887,7 +902,7 @@ public class TestCallableQueueService extends XTestCase { } public void testRemoveUniqueCallables() throws Exception { - XCommand command = new XCommand("Test", "type", 100) { + XCommand<?> command = new XCommand<Object>("Test", "type", 100) { @Override protected boolean isLockRequired() { return false; @@ -928,4 +943,255 @@ public class TestCallableQueueService extends XTestCase { uniquesAfter.removeAll(uniquesBefore); assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty()); } + + public void testPriorityExecutionOrder() throws InterruptedException, ServiceException { + Services.get().destroy(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "1000000"); + new Services().init(); + + final int taskCount = 999_999; + Multimap<Integer, Long> executions = Multimaps.synchronizedMultimap(ArrayListMultimap.create()); + List<BookingCallable> callables = new ArrayList<>(taskCount); + + for (int i = 2; i >= 0; i--) { + String type = String.valueOf(i); + for (int j = 0; j < taskCount / 3; j++) { + String key = type + "_" + UUID.randomUUID().toString(); + BookingCallable dc = new BookingCallable(executions, taskCount, key, type, i, 0); + callables.add(dc); + } + } + + CallableQueueService queueservice = Services.get().get(CallableQueueService.class); + + for (int i = 0; i < taskCount; i++) { + queueservice.queue(callables.get(i)); + } + + try { + finished.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error", e); + fail("Exception during test: " + e.getMessage()); + } + // It's necessary because after finished.await() returns, the last XCallable + // could still be running + waitFor(1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return queueservice.queueSize() == 0; + } + }); + + Map<Integer, Long> minTime = new HashMap<>(); + Map<Integer, Long> maxTime = new HashMap<>(); + + for (Map.Entry<Integer, Collection<Long>> entry : executions.asMap().entrySet()) { + int prio = entry.getKey(); + Collection<Long> values = entry.getValue(); + minTime.put(prio, Collections.min(values)); + maxTime.put(prio, Collections.max(values)); + } + + // Expected timeline of execution times: + // --> [min] Prio #2 [max] --> [min] Prio #1 [max] --> [min] Prio #0 [max] + + assertTrue("Failed: maxTime prio #2: " + maxTime.get(2) + " / minTime prio #1: " + minTime.get(1), + maxTime.get(2) <= minTime.get(1)); + assertTrue("Failed: maxTime prio #1: " + maxTime.get(1) + " / minTime prio #0: " + minTime.get(0), + maxTime.get(1) <= minTime.get(0)); + } + + public void testMaxConcurrencyReached() throws Exception { + Services.get().destroy(); + setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000"); + new Services().init(); + + int partitions = 10; + int taskPerPartition = 10000; + + final int taskCount = partitions * taskPerPartition; + + List<DummyCallable> callables = new ArrayList<>(taskCount); + + for (int i = 0; i < partitions; i++) { + String type = String.valueOf(i); + for (int j = 0; j < taskPerPartition; j++) { + String key = type + "_" + UUID.randomUUID().toString(); + DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0); + callables.add(dc); + } + } + + CallableQueueService queueservice = Services.get().get(CallableQueueService.class); + + for (int i = 0; i < taskCount; i++) { + queueservice.queue(callables.get(i)); + } + + try { + finished.await(100, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error", e); + fail("Exception during test: " + e.getMessage()); + } + + assertEquals("Not all callables have been executed", counter.get(), taskCount); + } + + public void testQueueSizeWithDelayedElements() throws InterruptedException { + final int taskCount = 10_000; + + List<DummyCallable> callables = new ArrayList<>(taskCount); + for (int i = 0; i < taskCount; i++) { + String keyAndType = String.valueOf(i); + DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0); + callables.add(dc); + } + + CallableQueueService queueservice = Services.get().get(CallableQueueService.class); + + for (int i = 0; i < taskCount; i++) { + queueservice.queue(callables.get(i), 2000); + } + + int queueSizeAfterSubmission = queueservice.queueSize(); + + try { + finished.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error", e); + fail("Exception during test: " + e.getMessage()); + } + + assertEquals("Queue size after submission", taskCount, queueSizeAfterSubmission); + assertEquals("Queue size after execution", 0, queueservice.queueSize()); + } + + public void testQueueSizeAfterNormalSubmission() throws InterruptedException { + final int taskCount = 10_000; + + List<DummyCallable> callables = new ArrayList<>(taskCount); + for (int i = 0; i < taskCount; i++) { + String keyAndType = String.valueOf(i); + DummyCallable dc = new DummyCallable(taskCount, keyAndType, keyAndType, 0, 0); + callables.add(dc); + } + + CallableQueueService queueservice = Services.get().get(CallableQueueService.class); + + for (int i = 0; i < taskCount; i++) { + queueservice.queue(callables.get(i)); + } + + // Not an exact number - it's close to 10,000 but keeps fluctuating + // We can still verify that it's larger than a certain number though + int queueSizeAfterSubmission = queueservice.queueSize(); + try { + finished.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error", e); + fail("Exception during test: " + e.getMessage()); + } + // It's necessary because after finished.await() returns, the last XCallable + // could still be running + waitFor(1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return queueservice.queueSize() == 0; + } + }); + + assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000", + queueSizeAfterSubmission > 9000); + assertEquals("Queue size after execution", 0, queueservice.queueSize()); + } + + public void testQueueSizeWhenMaxConcurrencyIsReached() throws InterruptedException { + int partitions = 10; + int taskPerPartition = 1000; + + final int taskCount = partitions * taskPerPartition; + + List<DummyCallable> callables = new ArrayList<>(taskCount); + + for (int i = 0; i < partitions; i++) { + String type = String.valueOf(i); + for (int j = 0; j < taskPerPartition; j++) { + String key = type + "_" + UUID.randomUUID().toString(); + DummyCallable dc = new DummyCallable(taskCount, key, type, 0, 0); + callables.add(dc); + } + } + + CallableQueueService queueservice = Services.get().get(CallableQueueService.class); + + for (int i = 0; i < taskCount; i++) { + queueservice.queue(callables.get(i)); + } + + // Not an exact number - it's close to 10,000 but keeps fluctuating + // We can still verify that it's larger than a certain number though + int queueSizeAfterSubmission = queueservice.queueSize(); + try { + finished.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Error", e); + fail("Exception during test: " + e.getMessage()); + } + // It's necessary because after finished.await() returns, the last XCallable + // could still be running + waitFor(1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return queueservice.queueSize() == 0; + } + }); + + assertTrue("Too few elements in the queue: " + queueSizeAfterSubmission + ", should be >9000", + queueSizeAfterSubmission > 9000); + assertEquals("Queue size after execution", 0, queueservice.queueSize()); + } + + private class DummyCallable extends MyCallable { + private final int taskCount; + + public DummyCallable(int taskCount, String key, String type, int priority, int wait) { + super(key, type, priority, wait); + this.taskCount = taskCount; + } + + public Void call() throws Exception { + if (counter.incrementAndGet() == taskCount) { + finished.countDown(); + } + + return null; + } + } + + private class BookingCallable extends MyCallable { + private final int taskCount; + private final Multimap<Integer, Long> executions; + + public BookingCallable(Multimap<Integer, Long> executions, + int taskCount, + String key, + String type, + int priority, + int wait) { + super(key, type, priority, wait); + this.taskCount = taskCount; + this.executions = executions; + } + + public Void call() throws Exception { + executions.put(getPriority(), System.currentTimeMillis()); + if (counter.incrementAndGet() == taskCount) { + finished.countDown(); + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/13bfd494/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 78f25e7..a16b3f9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko) OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti) OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros) OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros)