http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/BasicTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/BasicTask.java b/core/src/main/java/brooklyn/util/task/BasicTask.java deleted file mode 100644 index 57b2bb2..0000000 --- a/core/src/main/java/brooklyn/util/task/BasicTask.java +++ /dev/null @@ -1,892 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static brooklyn.util.JavaGroovyEquivalents.asString; -import static brooklyn.util.JavaGroovyEquivalents.elvisString; -import groovy.lang.Closure; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.management.LockInfo; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.util.GroovyJavaMethods; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.text.Identifiers; -import brooklyn.util.text.Strings; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Callables; -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ListenableFuture; - -/** - * The basic concrete implementation of a {@link Task} to be executed. - * - * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or - * {@link Callable} and will run in its own {@link Thread}. - * <p> - * The task can be given an optional displayName and description in its constructor (as named - * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called - * once whenever the task starts running and once again when the task is about to complete. Due to - * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we - * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job - * did so - or {@link #blockUntilEnded()} which will not throw errors. - */ -public class BasicTask<T> implements TaskInternal<T> { - private static final Logger log = LoggerFactory.getLogger(BasicTask.class); - - private String id = Identifiers.makeRandomId(8); - protected Callable<T> job; - public final String displayName; - public final String description; - - protected final Set<Object> tags = Sets.newConcurrentHashSet(); - // for debugging, to record where tasks were created -// { tags.add(new Throwable("Creation stack trace")); } - - protected Task<?> proxyTargetTask = null; - - protected String blockingDetails = null; - protected Task<?> blockingTask = null; - Object extraStatusText = null; - - /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */ - protected final ExecutionList listeners = new ExecutionList(); - - /** - * Constructor needed to prevent confusion in groovy stubs when looking for default constructor, - * - * The generics on {@link Closure} break it if that is first constructor. - */ - protected BasicTask() { this(Collections.emptyMap()); } - protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); } - - public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); } - - public BasicTask(Map<?,?> flags, Callable<T> job) { - this.job = job; - - if (flags.containsKey("tag")) tags.add(flags.remove("tag")); - Object ftags = flags.remove("tags"); - if (ftags!=null) { - if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags); - else { - log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument")); - tags.add(ftags); - } - } - - description = elvisString(flags.remove("description"), ""); - String d = asString(flags.remove("displayName")); - displayName = (d==null ? "" : d); - } - - public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); } - public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); } - public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); } - public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); } - - @Override - public String getId() { - return id; - } - - @Override - public int hashCode() { - return Objects.hashCode(id); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Task) - return ((Task<?>)obj).getId().equals(getId()); - return false; - } - - @Override - public String toString() { - // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings - return "Task["+ - (Strings.isNonEmpty(displayName) ? - displayName : - (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) + - ":"+getId()+"]"; - } - - @Override - public Task<T> asTask() { - return this; - } - - // housekeeping -------------------- - - /* - * These flags are set by BasicExecutionManager.submit. - * - * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed. - * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!) - * - * # submitter, submit time set, tags and other submit-time fields set - * - * # thread set, ThreadLocal getCurrentTask set - * # start time set, isBegun is true - * # task end callback run, if supplied - * - * # task runs - * - * # task end callback run, if supplied - * # end time set - * # thread cleared, ThreadLocal getCurrentTask set - * # Task.notifyAll() - * # Task.get() (result.get()) available, Task.isDone is true - * - * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly. - * Tests should catch most things, but be careful if you change any of the above semantics. - */ - - protected long queuedTimeUtc = -1; - protected long submitTimeUtc = -1; - protected long startTimeUtc = -1; - protected long endTimeUtc = -1; - protected Maybe<Task<?>> submittedByTask; - - protected volatile Thread thread = null; - private volatile boolean cancelled = false; - /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */ - protected volatile Future<T> internalFuture = null; - - @Override - public synchronized void initInternalFuture(ListenableFuture<T> result) { - if (this.internalFuture != null) - throw new IllegalStateException("task "+this+" is being given a result twice"); - this.internalFuture = result; - notifyAll(); - } - - // metadata accessors ------------ - - @Override - public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); } - - /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; - * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ - @Override - public long getQueuedTimeUtc() { return queuedTimeUtc; } - - @Override - public long getSubmitTimeUtc() { return submitTimeUtc; } - - @Override - public long getStartTimeUtc() { return startTimeUtc; } - - @Override - public long getEndTimeUtc() { return endTimeUtc; } - - @Override - public Future<T> getInternalFuture() { return internalFuture; } - - @Override - public Task<?> getSubmittedByTask() { - if (submittedByTask==null) return null; - return submittedByTask.orNull(); - } - - /** the thread where the task is running, if it is running */ - @Override - public Thread getThread() { return thread; } - - // basic fields -------------------- - - @Override - public boolean isQueued() { - return (queuedTimeUtc >= 0); - } - - @Override - public boolean isQueuedOrSubmitted() { - return isQueued() || isSubmitted(); - } - - @Override - public boolean isQueuedAndNotSubmitted() { - return isQueued() && (!isSubmitted()); - } - - @Override - public boolean isSubmitted() { - return submitTimeUtc >= 0; - } - - @Override - public boolean isBegun() { - return startTimeUtc >= 0; - } - - /** marks the task as queued for execution */ - @Override - public void markQueued() { - if (queuedTimeUtc<0) - queuedTimeUtc = System.currentTimeMillis(); - } - - @Override - public final synchronized boolean cancel() { return cancel(true); } - - /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted; - * probably going to be removed and perhaps some mechanism for running again made available - * @since 0.7.0 */ - @Beta - public synchronized boolean uncancel() { - boolean wasCancelled = cancelled; - cancelled = false; - return wasCancelled; - } - - @Override - public synchronized boolean cancel(boolean mayInterruptIfRunning) { - if (isDone()) return false; - boolean cancel = true; - cancelled = true; - if (internalFuture!=null) { - cancel = internalFuture.cancel(mayInterruptIfRunning); - } - notifyAll(); - return cancel; - } - - @Override - public boolean isCancelled() { - return cancelled || (internalFuture!=null && internalFuture.isCancelled()); - } - - @Override - public boolean isDone() { - // if endTime is set, result might not be completed yet, but it will be set very soon - // (the two values are set close in time, result right after the endTime; - // but callback hooks might not see the result yet) - return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0; - } - - /** - * Returns true if the task has had an error. - * - * Only true if calling {@link #get()} will throw an exception when it completes (including cancel). - * Implementations may set this true before completion if they have that insight, or - * (the default) they may compute it lazily after completion (returning false before completion). - */ - @Override - public boolean isError() { - if (!isDone()) return false; - if (isCancelled()) return true; - try { - get(); - return false; - } catch (Throwable t) { - return true; - } - } - - // future value -------------------- - - @Override - public T get() throws InterruptedException, ExecutionException { - try { - if (!isDone()) - Tasks.setBlockingTask(this); - blockUntilStarted(); - return internalFuture.get(); - } finally { - Tasks.resetBlockingTask(); - } - } - - @Override - public T getUnchecked() { - try { - return get(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - @Override - public synchronized void blockUntilStarted() { - blockUntilStarted(null); - } - - @Override - public synchronized boolean blockUntilStarted(Duration timeout) { - Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); - while (true) { - if (cancelled) throw new CancellationException(); - if (internalFuture==null) - try { - if (timeout==null) { - wait(); - } else { - long remaining = endTime - System.currentTimeMillis(); - if (remaining>0) - wait(remaining); - else - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - Throwables.propagate(e); - } - if (internalFuture!=null) return true; - } - } - - @Override - public void blockUntilEnded() { - blockUntilEnded(null); - } - - @Override - public boolean blockUntilEnded(Duration timeout) { - Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); - try { - boolean started = blockUntilStarted(timeout); - if (!started) return false; - if (timeout==null) { - internalFuture.get(); - } else { - long remaining = endTime - System.currentTimeMillis(); - if (remaining>0) - internalFuture.get(remaining, TimeUnit.MILLISECONDS); - } - return isDone(); - } catch (Throwable t) { - Exceptions.propagateIfFatal(t); - if (!(t instanceof TimeoutException) && log.isDebugEnabled()) - log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t); - return isDone(); - } - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return get(new Duration(timeout, unit)); - } - - @Override - public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException { - long start = System.currentTimeMillis(); - Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp(); - while (end==null || end > System.currentTimeMillis()) { - if (cancelled) throw new CancellationException(); - if (internalFuture == null) { - synchronized (this) { - long remaining = end - System.currentTimeMillis(); - if (internalFuture==null && remaining>0) - wait(remaining); - } - } - if (internalFuture != null) break; - } - Long remaining = end==null ? null : end - System.currentTimeMillis(); - if (isDone()) { - return internalFuture.get(1, TimeUnit.MILLISECONDS); - } else if (remaining == null) { - return internalFuture.get(); - } else if (remaining > 0) { - return internalFuture.get(remaining, TimeUnit.MILLISECONDS); - } else { - throw new TimeoutException(); - } - } - - @Override - public T getUnchecked(Duration duration) { - try { - return get(duration); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - // ------------------ status --------------------------- - - /** - * Returns a brief status string - * - * Plain-text format. Reported status if there is one, otherwise state which will be one of: - * <ul> - * <li>Not submitted - * <li>Submitted for execution - * <li>Ended by error - * <li>Ended by cancellation - * <li>Ended normally - * <li>Running - * <li>Waiting - * </ul> - */ - @Override - public String getStatusSummary() { - return getStatusString(0); - } - - /** - * Returns detailed status, suitable for a hover - * - * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled. - */ - @Override - public String getStatusDetail(boolean multiline) { - return getStatusString(multiline?2:1); - } - - /** - * This method is useful for callers to see the status of a task. - * - * Also for developers to see best practices for examining status fields etc - * - * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail - */ - protected String getStatusString(int verbosity) { -// Thread t = getThread(); - String rv; - if (submitTimeUtc <= 0) rv = "Not submitted"; - else if (!isCancelled() && startTimeUtc <= 0) { - rv = "Submitted for execution"; - if (verbosity>0) { - long elapsed = System.currentTimeMillis() - submitTimeUtc; - rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago"; - } - if (verbosity >= 2 && getExtraStatusText()!=null) { - rv += "\n\n"+getExtraStatusText(); - } - } else if (isDone()) { - long elapsed = endTimeUtc - submitTimeUtc; - String duration = Time.makeTimeStringRounded(elapsed); - if (isCancelled()) { - rv = "Cancelled"; - if (verbosity >= 1) rv+=" after "+duration; - - if (verbosity >= 2 && getExtraStatusText()!=null) { - rv += "\n\n"+getExtraStatusText(); - } - } else if (isError()) { - rv = "Failed"; - if (verbosity >= 1) { - rv += " after "+duration; - Throwable error = Tasks.getError(this); - - if (verbosity >= 2 && getExtraStatusText()!=null) { - rv += "\n\n"+getExtraStatusText(); - } - - //remove outer ExecException which is reported by the get(), we want the exception the task threw - while (error instanceof ExecutionException) error = error.getCause(); - String errorMessage = Exceptions.collapseText(error); - - if (verbosity == 1) rv += ": "+abbreviate(errorMessage); - if (verbosity >= 2) { - rv += ": "+errorMessage; - StringWriter sw = new StringWriter(); - ((Throwable)error).printStackTrace(new PrintWriter(sw)); - rv += "\n\n"+sw.getBuffer(); - } - } - } else { - rv = "Completed"; - if (verbosity>=1) { - if (verbosity==1) { - try { - Object v = get(); - rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString())); - } catch (Exception e) { - rv += ", but error accessing result ["+e+"]"; //shouldn't happen - } - } else { - rv += " after "+duration; - try { - Object v = get(); - rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v); - } catch (Exception e) { - rv += " at first\n" + - "Error accessing result ["+e+"]"; //shouldn't happen - } - if (verbosity >= 2 && getExtraStatusText()!=null) { - rv += "\n\n"+getExtraStatusText(); - } - } - } - } - } else { - rv = getActiveTaskStatusString(verbosity); - } - return rv; - } - - private static String abbreviate(String s) { - s = Strings.getFirstLine(s); - if (s.length()>255) s = s.substring(0, 252)+ "..."; - return s; - } - - protected String getActiveTaskStatusString(int verbosity) { - String rv = ""; - Thread t = getThread(); - - // Normally, it's not possible for thread==null as we were started and not ended - - // However, there is a race where the task starts sand completes between the calls to getThread() - // at the start of the method and this call to getThread(), so both return null even though - // the intermediate checks returned started==true isDone()==false. - if (t == null) { - if (isDone()) { - return getStatusString(verbosity); - } else { - //should only happen for repeating task which is not active - return "Sleeping"; - } - } - - ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE)); - if (getThread()==null) - //thread might have moved on to a new task; if so, recompute (it should now say "done") - return getStatusString(verbosity); - - if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) { - if (verbosity==1) - // short status string will just show blocking details - return blockingDetails; - //otherwise show the blocking details, then a new line, then additional information - rv = blockingDetails + "\n\n"; - } - - if (verbosity >= 1 && blockingTask!=null) { - if (verbosity==1) - // short status string will just show blocking details - return "Waiting on "+blockingTask; - //otherwise show the blocking details, then a new line, then additional information - rv = "Waiting on "+blockingTask + "\n\n"; - } - - if (verbosity>=2) { - if (getExtraStatusText()!=null) { - rv += getExtraStatusText()+"\n\n"; - } - - rv += ""+toString()+"\n"; - if (submittedByTask!=null) { - rv += "Submitted by "+submittedByTask+"\n"; - } - - if (this instanceof HasTaskChildren) { - // list children tasks for compound tasks - try { - Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren(); - if (childrenTasks.iterator().hasNext()) { - rv += "Children:\n"; - for (Task<?> child: childrenTasks) { - rv += " "+child+": "+child.getStatusDetail(false)+"\n"; - } - } - } catch (ConcurrentModificationException exc) { - rv += " (children not available - currently being modified)\n"; - } - } - rv += "\n"; - } - - LockInfo lock = ti.getLockInfo(); - rv += "In progress"; - if (verbosity>=1) { - if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) { - //not blocked - if (ti.isSuspended()) { - // when does this happen? - rv += ", thread suspended"; - } else { - if (verbosity >= 2) rv += " ("+ti.getThreadState()+")"; - } - } else { - rv +=", thread waiting "; - if (ti.getThreadState() == Thread.State.BLOCKED) { - rv += "(mutex) on "+lookup(lock); - //TODO could say who holds it - } else if (ti.getThreadState() == Thread.State.WAITING) { - rv += "(notify) on "+lookup(lock); - } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) { - rv += "(timed) on "+lookup(lock); - } else { - rv = "("+ti.getThreadState()+") on "+lookup(lock); - } - } - } - if (verbosity>=2) { - StackTraceElement[] st = ti.getStackTrace(); - st = brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st); - if (st!=null && st.length>0) - rv += "\n" +"At: "+st[0]; - for (int ii=1; ii<st.length; ii++) { - rv += "\n" +" "+st[ii]; - } - } - return rv; - } - - protected String lookup(LockInfo info) { - return info!=null ? ""+info : "unknown (sleep)"; - } - - @Override - public String getDisplayName() { - return displayName; - } - - @Override - public String getDescription() { - return description; - } - - - /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait, - * and typically cleared immediately afterwards; referenced by management api to inspect a task - * which is blocking - */ - @Override - public String setBlockingDetails(String blockingDetails) { - String old = this.blockingDetails; - this.blockingDetails = blockingDetails; - return old; - } - - @Override - public Task<?> setBlockingTask(Task<?> blockingTask) { - Task<?> old = this.blockingTask; - this.blockingTask = blockingTask; - return old; - } - - @Override - public void resetBlockingDetails() { - this.blockingDetails = null; - } - - @Override - public void resetBlockingTask() { - this.blockingTask = null; - } - - /** returns a textual message giving details while the task is blocked */ - @Override - public String getBlockingDetails() { - return blockingDetails; - } - - /** returns a task that this task is blocked on */ - @Override - public Task<?> getBlockingTask() { - return blockingTask; - } - - @Override - public void setExtraStatusText(Object extraStatus) { - this.extraStatusText = extraStatus; - } - - @Override - public Object getExtraStatusText() { - return extraStatusText; - } - - // ---- add a way to warn if task is not run - - public interface TaskFinalizer { - public void onTaskFinalization(Task<?> t); - } - - public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() { - @Override - public void onTaskFinalization(Task<?> t) { - if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) { - log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)"); - log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true)); - return; - } - if (!t.isDone()) { - // shouldn't happen - // TODO But does happen if management context was terminated (e.g. running test suite). - // Should check if Execution Manager is running, and only log if it was not terminated? - log.warn("Task "+t+" is being finalized before completion"); - return; - } - } - }; - - public static final TaskFinalizer NO_OP = new TaskFinalizer() { - @Override - public void onTaskFinalization(Task<?> t) { - } - }; - - public void ignoreIfNotRun() { - setFinalizer(NO_OP); - } - - public void setFinalizer(TaskFinalizer f) { - TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); - if (finalizer!=null && finalizer!=f) - throw new IllegalStateException("Cannot apply multiple finalizers"); - if (isDone()) - throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished"); - tags.add(f); - } - - @Override - protected void finalize() throws Throwable { - TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); - if (finalizer==null) finalizer = WARN_IF_NOT_RUN; - finalizer.onTaskFinalization(this); - } - - public static class SubmissionErrorCatchingExecutor implements Executor { - final Executor target; - public SubmissionErrorCatchingExecutor(Executor target) { - this.target = target; - } - @Override - public void execute(Runnable command) { - if (isShutdown()) { - log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown."); - return; - } - try { - target.execute(command); - } catch (Exception e) { - if (isShutdown()) { - log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown."); - } else { - log.warn("Execution of task callback hook "+command+" failed: "+e, e); - } - } - } - protected boolean isShutdown() { - return target instanceof ExecutorService && ((ExecutorService)target).isShutdown(); - } - } - - @Override - public void addListener(Runnable listener, Executor executor) { - listeners.add(listener, new SubmissionErrorCatchingExecutor(executor)); - } - - @Override - public void runListeners() { - listeners.execute(); - } - - @Override - public void setEndTimeUtc(long val) { - endTimeUtc = val; - } - - @Override - public void setThread(Thread thread) { - this.thread = thread; - } - - @Override - public Callable<T> getJob() { - return job; - } - - @Override - public void setJob(Callable<T> job) { - this.job = job; - } - - @Override - public ExecutionList getListeners() { - return listeners; - } - - @Override - public void setSubmitTimeUtc(long val) { - submitTimeUtc = val; - } - - private static <T> Task<T> newGoneTaskFor(Task<?> task) { - Task<T> t = Tasks.<T>builder().dynamic(false).name(task.getDisplayName()) - .description("Details of the original task "+task+" have been forgotten.") - .body(Callables.returning((T)null)).build(); - ((BasicTask<T>)t).ignoreIfNotRun(); - return t; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void setSubmittedByTask(Task<?> task) { - submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task))); - } - - @Override - public Set<Object> getMutableTags() { - return tags; - } - - @Override - public void setStartTimeUtc(long val) { - startTimeUtc = val; - } - - @Override - public void applyTagModifier(Function<Set<Object>,Void> modifier) { - modifier.apply(tags); - } - - @Override - public Task<?> getProxyTarget() { - return proxyTargetTask; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/CanSetName.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/CanSetName.java b/core/src/main/java/brooklyn/util/task/CanSetName.java deleted file mode 100644 index 760c99e..0000000 --- a/core/src/main/java/brooklyn/util/task/CanSetName.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -public interface CanSetName { - - void setName(String name); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/CompoundTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/CompoundTask.java b/core/src/main/java/brooklyn/util/task/CompoundTask.java deleted file mode 100644 index e33120c..0000000 --- a/core/src/main/java/brooklyn/util/task/CompoundTask.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import groovy.lang.Closure; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.core.management.internal.ManagementContextInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.util.collections.MutableMap; - - -/** - * A {@link Task} that is comprised of other units of work: possibly a heterogeneous mix of {@link Task}, - * {@link Runnable}, {@link Callable} and {@link Closure} instances. - * - * This class holds the collection of child tasks, but subclasses have the responsibility of executing them in a - * sensible manner by implementing the abstract {@link #runJobs} method. - */ -public abstract class CompoundTask<T> extends BasicTask<List<T>> implements HasTaskChildren { - - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(CompoundTask.class); - - protected final List<Task<? extends T>> children; - protected final List<Object> result; - - /** - * Constructs a new compound task containing the specified units of work. - * - * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. - * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types - */ - public CompoundTask(Object... jobs) { - this( Arrays.asList(jobs) ); - } - - /** - * Constructs a new compound task containing the specified units of work. - * - * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. - * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types - */ - public CompoundTask(Collection<?> jobs) { - this(MutableMap.of("tag", "compound"), jobs); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public CompoundTask(Map<String,?> flags, Collection<?> jobs) { - super(flags); - super.job = new Callable<List<T>>() { - @Override public List<T> call() throws Exception { - return runJobs(); - } - }; - - this.result = new ArrayList<Object>(jobs.size()); - this.children = new ArrayList<Task<? extends T>>(jobs.size()); - for (Object job : jobs) { - Task subtask; - if (job instanceof TaskAdaptable) { subtask = ((TaskAdaptable)job).asTask(); } - else if (job instanceof Closure) { subtask = new BasicTask<T>((Closure) job); } - else if (job instanceof Callable) { subtask = new BasicTask<T>((Callable) job); } - else if (job instanceof Runnable) { subtask = new BasicTask<T>((Runnable) job); } - - else throw new IllegalArgumentException("Invalid child "+(job == null ? null : job.getClass() + " ("+job+")")+ - " passed to compound task; must be Runnable, Callable, Closure or Task"); - - BrooklynTaskTags.addTagDynamically(subtask, ManagementContextInternal.SUB_TASK_TAG); - children.add(subtask); - } - - for (Task<?> t: getChildren()) { - ((TaskInternal<?>)t).markQueued(); - } - } - - /** return value needs to be specified by subclass; subclass should also setBlockingDetails - * @throws ExecutionException - * @throws InterruptedException */ - protected abstract List<T> runJobs() throws InterruptedException, ExecutionException; - - protected void submitIfNecessary(TaskAdaptable<?> task) { - if (!task.asTask().isSubmitted()) { - if (BasicExecutionContext.getCurrentExecutionContext() == null) { - throw new IllegalStateException("Compound task ("+task+") launched from "+this+" missing required execution context"); - } else { - BasicExecutionContext.getCurrentExecutionContext().submit(task); - } - } - } - - public List<Task<? extends T>> getChildrenTyped() { - return children; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public List<Task<?>> getChildren() { - return (List) getChildrenTyped(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DeferredSupplier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/DeferredSupplier.java b/core/src/main/java/brooklyn/util/task/DeferredSupplier.java deleted file mode 100644 index d82b3fb..0000000 --- a/core/src/main/java/brooklyn/util/task/DeferredSupplier.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import com.google.common.base.Supplier; - -/** - * A class that supplies objects of a single type. When used as a ConfigKey value, - * the evaluation is deferred until getConfig() is called. The returned value will then - * be coerced to the correct type. - * - * Subsequent calls to getConfig will result in further calls to deferredProvider.get(), - * rather than reusing the result. If you want to reuse the result, consider instead - * using a Future. - * - * Note that this functionality replaces the ues of Closure in brooklyn 0.4.0, which - * served the same purpose. - */ -public interface DeferredSupplier<T> extends Supplier<T> { - @Override - T get(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java b/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java deleted file mode 100644 index 455a889..0000000 --- a/core/src/main/java/brooklyn/util/task/DynamicSequentialTask.java +++ /dev/null @@ -1,480 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import groovy.lang.Closure; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskQueueingContext; -import org.apache.brooklyn.core.management.internal.ManagementContextInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.CountdownTimer; -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; -import com.google.common.collect.ImmutableList; - -/** Represents a task whose run() method can create other tasks - * which are run sequentially, but that sequence runs in parallel to this task - * <p> - * There is an optional primary job run with this task, along with multiple secondary children. - * If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default - * subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted). - * You can change the behavior of this task with fields in {@link FailureHandlingConfig}, - * or the convenience {@link TaskQueueingContext#swallowChildrenFailures()} - * (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task). - * <p> - * This synchronizes on secondary tasks when submitting them, in case they may be manually submitted - * and the submitter wishes to ensure it is only submitted once. - * <p> - * Improvements which would be nice to have: - * <li> unqueued tasks not visible in api; would like that - * <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this, - * and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above) - * <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI) - **/ -public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext { - - private static final Logger log = LoggerFactory.getLogger(CompoundTask.class); - - protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>(); - protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>(); - protected final Object jobTransitionLock = new Object(); - protected volatile boolean primaryStarted = false; - protected volatile boolean primaryFinished = false; - protected volatile boolean secondaryQueueAborted = false; - protected Thread primaryThread; - protected DstJob dstJob; - protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT; - - // default values for how to handle the various failures - @Beta - public static class FailureHandlingConfig { - /** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order), - * but can set it up not to submit any more tasks if the primary fails */ - public final boolean abortSecondaryQueueOnPrimaryFailure; - /** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/ - public final boolean cancelSecondariesOnPrimaryFailure; - /** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual; - * typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case - * the secondary queue is never aborted */ - public final boolean abortSecondaryQueueOnSecondaryFailure; - /** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */ - public final boolean cancelSecondariesOnSecondaryFailure; - /** whether to issue cancel against primary task if a secondary task fails */ - public final boolean cancelPrimaryOnSecondaryFailure; - /** whether to fail this task if a secondary task fails */ - public final boolean failParentOnSecondaryFailure; - - @Beta - public FailureHandlingConfig( - boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure, - boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure, - boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) { - this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure; - this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure; - this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure; - this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure; - this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure; - this.failParentOnSecondaryFailure = failParentOnSecondaryFailure; - } - - public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true); - public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false); - } - - public static class QueueAbortedException extends IllegalStateException { - private static final long serialVersionUID = -7569362887826818524L; - - public QueueAbortedException(String msg) { - super(msg); - } - public QueueAbortedException(String msg, Throwable cause) { - super(msg, cause); - } - } - - /** - * Constructs a new compound task containing the specified units of work. - * - * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. - * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types - */ - public DynamicSequentialTask() { - this(null); - } - - public DynamicSequentialTask(Callable<T> mainJob) { - this(MutableMap.of("tag", "compound"), mainJob); - } - - public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) { - super(flags); - this.job = dstJob = new DstJob(mainJob); - } - - @Override - public void queue(Task<?> t) { - synchronized (jobTransitionLock) { - if (primaryFinished) - throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")"); - if (secondaryQueueAborted) - throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")"); - secondaryJobsAll.add(t); - secondaryJobsRemaining.add(t); - BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG); - ((TaskInternal<?>)t).markQueued(); - jobTransitionLock.notifyAll(); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true); - } - public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) { - if (isDone()) return false; - if (log.isTraceEnabled()) log.trace("cancelling {}", this); - boolean cancel = super.cancel(mayInterruptTask); - if (alsoCancelChildren) { - for (Task<?> t: secondaryJobsAll) - cancel |= t.cancel(mayInterruptTask); - } - synchronized (jobTransitionLock) { - if (primaryThread!=null) { - if (interruptPrimaryThread) { - if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this); - primaryThread.interrupt(); - } - cancel = true; - } - } - return cancel; - } - - @Override - public synchronized boolean uncancel() { - secondaryQueueAborted = false; - return super.uncancel(); - } - - @Override - public Iterable<Task<?>> getChildren() { - return Collections.unmodifiableCollection(secondaryJobsAll); - } - - /** submits the indicated task for execution in the current execution context, and returns immediately */ - protected void submitBackgroundInheritingContext(Task<?> task) { - BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); - if (log.isTraceEnabled()) { - log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec }); - } - if (ec==null) { - String message = Tasks.current()!=null ? - // user forgot ExecContext: - "Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current() - : // should not happen: - "Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this; - log.warn(message+" (rethrowing)"); - throw new IllegalStateException(message); - } - synchronized (task) { - if (task.isSubmitted()) { - if (log.isTraceEnabled()) { - log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted"); - } - } else { - try { - ec.submit(task); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - // Give some context when the submit fails (happens when the target is already unmanaged) - throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e); - } - } - } - } - - public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) { - this.failureHandlingConfig = failureHandlingConfig; - } - @Override - public void swallowChildrenFailures() { - setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES); - } - - protected class DstJob implements Callable<T> { - protected Callable<T> primaryJob; - /** currently executing (or just completed) secondary task, or null if none; - * with jobTransitionLock notified on change and completion */ - protected volatile Task<?> currentSecondary = null; - protected volatile boolean finishedSecondaries = false; - - public DstJob(Callable<T> mainJob) { - this.primaryJob = mainJob; - } - - @SuppressWarnings("unchecked") - @Override - public T call() throws Exception { - - synchronized (jobTransitionLock) { - primaryStarted = true; - primaryThread = Thread.currentThread(); - for (Task<?> t: secondaryJobsAll) - ((TaskInternal<?>)t).markQueued(); - } - // TODO overkill having a thread/task for this, but it works - // optimisation would either use newTaskEndCallback property on task to submit - // or use some kind of single threaded executor for the queued tasks - Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false) - .name("DST manager (internal)") - // TODO marking it transient helps it be GC'd sooner, - // but ideally we wouldn't have this, - // or else it would be a child - .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) - .body(new Callable<List<Object>>() { - - @Override - public List<Object> call() throws Exception { - List<Object> result = new ArrayList<Object>(); - try { - while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) { - synchronized (jobTransitionLock) { - if (!primaryFinished && secondaryJobsRemaining.isEmpty()) { - currentSecondary = null; - jobTransitionLock.wait(1000); - } - } - @SuppressWarnings("rawtypes") - Task secondaryJob = secondaryJobsRemaining.poll(); - if (secondaryJob != null) { - synchronized (jobTransitionLock) { - currentSecondary = secondaryJob; - submitBackgroundInheritingContext(secondaryJob); - jobTransitionLock.notifyAll(); - } - try { - result.add(secondaryJob.get()); - } catch (Exception e) { - if (TaskTags.isInessential(secondaryJob)) { - result.add(Tasks.getError(secondaryJob)); - if (log.isDebugEnabled()) - log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e); - } else { - if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) { - if (log.isDebugEnabled()) - log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e); - synchronized (jobTransitionLock) { - for (Task<?> t: secondaryJobsRemaining) - t.cancel(true); - jobTransitionLock.notifyAll(); - } - } - - if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) { - if (log.isDebugEnabled()) - log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)"); - secondaryQueueAborted = true; - throw e; - } - - if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) { - cancel(true, false, false); - } - - result.add(Tasks.getError(secondaryJob)); - if (log.isDebugEnabled()) - log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)"); - } - } - } - } - } finally { - synchronized (jobTransitionLock) { - currentSecondary = null; - finishedSecondaries = true; - jobTransitionLock.notifyAll(); - } - } - return result; - } - }).build(); - ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this; - - submitBackgroundInheritingContext(secondaryJobMaster); - - T result = null; - Throwable error = null; - Throwable uninterestingSelfError = null; - boolean errorIsFromChild = false; - try { - if (log.isTraceEnabled()) log.trace("calling primary job for {}", this); - if (primaryJob!=null) result = primaryJob.call(); - } catch (Throwable selfException) { - Exceptions.propagateIfFatal(selfException); - if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) { - // Error was caused by the task already having failed, and this thread calling queue() to try - // to queue more work. The underlying cause will be much more interesting. - // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted", - // which gets propagated instead of the more interesting child exception. - uninterestingSelfError = selfException; - } else { - error = selfException; - errorIsFromChild = false; - } - if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) { - if (log.isDebugEnabled()) - log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException); - secondaryQueueAborted = true; - } - if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) { - if (log.isDebugEnabled()) - log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException); - synchronized (jobTransitionLock) { - for (Task<?> t: secondaryJobsRemaining) - t.cancel(true); - // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below - primaryThread = null; - primaryFinished = true; - } - } - } finally { - try { - if (log.isTraceEnabled()) log.trace("cleaning up for {}", this); - synchronized (jobTransitionLock) { - // semaphore might be nicer here (aled notes as it is this is a little hard to read) - primaryThread = null; - primaryFinished = true; - jobTransitionLock.notifyAll(); - } - if (!isCancelled() && !Thread.currentThread().isInterrupted()) { - if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this); - // wait on tasks sequentially so that blocking information is more interesting - DynamicTasks.waitForLast(); - List<Object> result2 = secondaryJobMaster.get(); - try { - if (primaryJob==null) result = (T)result2; - } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ } - } - } catch (Throwable childException) { - Exceptions.propagateIfFatal(childException); - if (error==null) { - error = childException; - errorIsFromChild = true; - } else { - if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")"); - } - } - } - if (error!=null) { - handleException(error, errorIsFromChild); - } - if (uninterestingSelfError != null) { - handleException(uninterestingSelfError, false); - } - return result; - } - - @Override - public String toString() { - return "DstJob:"+DynamicSequentialTask.this.getId(); - } - - /** waits for this job to complete, or the given time to elapse */ - public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException { - CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null; - while (true) { - Task<?> cs; - Duration remaining; - synchronized (jobTransitionLock) { - cs = currentSecondary; - if (finishedSecondaries) return; - remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining(); - if (!remaining.isPositive()) return; - if (cs==null) { - if (!includePrimary && secondaryJobsRemaining.isEmpty()) return; - // parent still running, no children though - Tasks.setBlockingTask(DynamicSequentialTask.this); - jobTransitionLock.wait(remaining.toMilliseconds()); - Tasks.resetBlockingDetails(); - } - } - if (cs!=null) { - Tasks.setBlockingTask(cs); - cs.blockUntilEnded(remaining); - Tasks.resetBlockingDetails(); - } - } - } - } - - @Override - public List<Task<?>> getQueue() { - return ImmutableList.copyOf(secondaryJobsAll); - } - - public void handleException(Throwable throwable, boolean fromChild) throws Exception { - Exceptions.propagateIfFatal(throwable); - if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) { - log.debug("Parent task "+this+" swallowing child error: "+throwable); - return; - } - handleException(throwable); - } - public void handleException(Throwable throwable) throws Exception { - Exceptions.propagateIfFatal(throwable); - if (throwable instanceof Exception) { - // allow checked exceptions to be passed through - throw (Exception)throwable; - } - throw Exceptions.propagate(throwable); - } - - @Override - public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) { - try { - dstJob.join(includePrimary, optionalTimeout); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - if (throwFirstError) { - if (isError()) - getUnchecked(); - for (Task<?> t: getQueue()) - if (t.isError() && !TaskTags.isInessential(t)) - t.getUnchecked(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/DynamicTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/DynamicTasks.java b/core/src/main/java/brooklyn/util/task/DynamicTasks.java deleted file mode 100644 index 9d552c6..0000000 --- a/core/src/main/java/brooklyn/util/task/DynamicTasks.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.List; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.api.management.TaskFactory; -import org.apache.brooklyn.api.management.TaskQueueingContext; -import org.apache.brooklyn.api.management.TaskWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.EntityInternal; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; - -/** - * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks. - * - * @since 0.6.0 - */ -@Beta -public class DynamicTasks { - - private static final Logger log = LoggerFactory.getLogger(DynamicTasks.class); - private static final ThreadLocal<TaskQueueingContext> taskQueueingContext = new ThreadLocal<TaskQueueingContext>(); - - public static void setTaskQueueingContext(TaskQueueingContext newTaskQC) { - taskQueueingContext.set(newTaskQC); - } - - public static TaskQueueingContext getThreadTaskQueuingContext() { - return taskQueueingContext.get(); - } - - public static TaskQueueingContext getTaskQueuingContext() { - TaskQueueingContext adder = getThreadTaskQueuingContext(); - if (adder!=null) return adder; - Task<?> t = Tasks.current(); - if (t instanceof TaskQueueingContext) return (TaskQueueingContext) t; - return null; - } - - - public static void removeTaskQueueingContext() { - taskQueueingContext.remove(); - } - - public static class TaskQueueingResult<T> implements TaskWrapper<T> { - private final Task<T> task; - private final boolean wasQueued; - private ExecutionContext execContext = null; - - private TaskQueueingResult(TaskAdaptable<T> task, boolean wasQueued) { - this.task = task.asTask(); - this.wasQueued = wasQueued; - } - @Override - public Task<T> asTask() { - return task; - } - @Override - public Task<T> getTask() { - return task; - } - /** returns true if the task was queued */ - public boolean wasQueued() { - return wasQueued; - } - /** returns true if the task either is currently queued or has been submitted */ - public boolean isQueuedOrSubmitted() { - return wasQueued || Tasks.isQueuedOrSubmitted(task); - } - /** specifies an execContext to use if the task has to be explicitly submitted; - * if omitted it will attempt to find one based on the current thread's context */ - public TaskQueueingResult<T> executionContext(ExecutionContext execContext) { - this.execContext = execContext; - return this; - } - /** as {@link #executionContext(ExecutionContext)} but inferring from the entity */ - public TaskQueueingResult<T> executionContext(Entity entity) { - this.execContext = ((EntityInternal)entity).getManagementSupport().getExecutionContext(); - return this; - } - private boolean orSubmitInternal() { - if (!wasQueued()) { - if (isQueuedOrSubmitted()) { - log.warn("Redundant call to execute "+getTask()+"; skipping"); - return false; - } else { - ExecutionContext ec = execContext; - if (ec==null) - ec = BasicExecutionContext.getCurrentExecutionContext(); - if (ec==null) - throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext"); - ec.submit(getTask()); - return true; - } - } else { - return false; - } - } - /** causes the task to be submitted (asynchronously) if it hasn't already been, - * requiring an entity execution context (will try to find a default if not set) */ - public TaskQueueingResult<T> orSubmitAsync() { - orSubmitInternal(); - return this; - } - /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */ - public TaskQueueingResult<T> orSubmitAsync(Entity entity) { - executionContext(entity); - return orSubmitAsync(); - } - /** causes the task to be submitted *synchronously* if it hasn't already been submitted; - * useful in contexts such as libraries where callers may be either on a legacy call path - * (which assumes all commands complete immediately); - * requiring an entity execution context (will try to find a default if not set) */ - public TaskQueueingResult<T> orSubmitAndBlock() { - if (orSubmitInternal()) task.getUnchecked(); - return this; - } - /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */ - public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) { - executionContext(entity); - return orSubmitAndBlock(); - } - /** blocks for the task to be completed - * <p> - * needed in any context where subsequent commands assume the task has completed. - * not needed in a context where the task is simply being built up and queued. - * <p> - * throws if there are any errors - */ - public T andWaitForSuccess() { - return task.getUnchecked(); - } - public void orCancel() { - if (!wasQueued()) { - task.cancel(false); - } - } - } - - /** - * Tries to add the task to the current addition context if there is one, otherwise does nothing. - * <p/> - * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned - * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a - * {@link BasicExecutionContext}. - */ - public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) { - TaskQueueingContext adder = getTaskQueuingContext(); - boolean result = false; - if (adder!=null) - result = Tasks.tryQueueing(adder, task); - return new TaskQueueingResult<T>(task, result); - } - - /** @see #queueIfPossible(TaskAdaptable) */ - public static <T> TaskQueueingResult<T> queueIfPossible(TaskFactory<? extends TaskAdaptable<T>> task) { - return queueIfPossible(task.newTask()); - } - - /** adds the given task to the nearest task addition context, - * either set as a thread-local, or in the current task, or the submitter of the task, etc - * <p> - * throws if it cannot add */ - public static <T> Task<T> queueInTaskHierarchy(Task<T> task) { - Preconditions.checkNotNull(task, "Task to queue cannot be null"); - Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task); - - TaskQueueingContext adder = getTaskQueuingContext(); - if (adder!=null) { - if (Tasks.tryQueueing(adder, task)) { - log.debug("Queued task {} at context {} (no hierarchy)", task, adder); - return task; - } - } - - Task<?> t = Tasks.current(); - Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task); - - while (t!=null) { - if (t instanceof TaskQueueingContext) { - if (Tasks.tryQueueing((TaskQueueingContext)t, task)) { - log.debug("Queued task {} at hierarchical context {}", task, t); - return task; - } - } - t = t.getSubmittedByTask(); - } - - throw new IllegalStateException("No task addition context available in current task hierarchy for adding task "+task); - } - - /** - * Queues the given task. - * <p/> - * This method is only valid within a dynamic task. Use {@link #queueIfPossible(TaskAdaptable)} - * and {@link TaskQueueingResult#orSubmitAsync()} if the calling context is a basic task. - * - * @param task The task to queue - * @throws IllegalStateException if no task queueing context is available - * @return The queued task - */ - public static <V extends TaskAdaptable<?>> V queue(V task) { - try { - Preconditions.checkNotNull(task, "Task to queue cannot be null"); - Preconditions.checkState(!Tasks.isQueued(task), "Task to queue must not yet be queued: %s", task); - TaskQueueingContext adder = getTaskQueuingContext(); - if (adder==null) { - throw new IllegalStateException("Task "+task+" cannot be queued here; no queueing context available"); - } - adder.queue(task.asTask()); - return task; - } catch (Throwable e) { - log.warn("Error queueing "+task+" (rethrowing): "+e); - throw Exceptions.propagate(e); - } - } - - /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ - public static void queue(TaskAdaptable<?> task1, TaskAdaptable<?> task2, TaskAdaptable<?> ...tasks) { - queue(task1); - queue(task2); - for (TaskAdaptable<?> task: tasks) queue(task); - } - - /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ - public static <T extends TaskAdaptable<?>> T queue(TaskFactory<T> taskFactory) { - return queue(taskFactory.newTask()); - } - - /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ - public static void queue(TaskFactory<?> task1, TaskFactory<?> task2, TaskFactory<?> ...tasks) { - queue(task1.newTask()); - queue(task2.newTask()); - for (TaskFactory<?> task: tasks) queue(task.newTask()); - } - - /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ - public static <T> Task<T> queue(String name, Callable<T> job) { - return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build()); - } - - /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ - public static <T> Task<T> queue(String name, Runnable job) { - return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build()); - } - - /** queues the task if needed, i.e. if it is not yet submitted (so it will run), - * or if it is submitted but not queued and we are in a queueing context (so it is available for informational purposes) */ - public static <T extends TaskAdaptable<?>> T queueIfNeeded(T task) { - if (!Tasks.isQueued(task)) { - if (Tasks.isSubmitted(task) && getTaskQueuingContext()==null) { - // already submitted and not in a queueing context, don't try to queue - } else { - // needs submitting, put it in the queue - // (will throw an error if we are not a queueing context) - queue(task); - } - } - return task; - } - - /** submits/queues the given task if needed, and gets the result (unchecked) - * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */ - // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks! - public static <T> T get(TaskAdaptable<T> t) { - return queueIfNeeded(t).asTask().getUnchecked(); - } - - /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error - * (excluding errors in inessential tasks), - * then returning the last task in the queue (which is guaranteed to have finished without error, - * if this method returns without throwing) */ - public static Task<?> waitForLast() { - drain(null, true); - // this call to last is safe, as the above guarantees everything will have run - // (on errors the above will throw so we won't come here) - List<Task<?>> q = DynamicTasks.getTaskQueuingContext().getQueue(); - return q.isEmpty() ? null : Iterables.getLast(q); - } - - /** Calls {@link TaskQueueingContext#drain(Duration, boolean, boolean)} on the current task context */ - public static TaskQueueingContext drain(Duration optionalTimeout, boolean throwFirstError) { - TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); - Preconditions.checkNotNull(qc, "Cannot drain when there is no queueing context"); - qc.drain(optionalTimeout, false, throwFirstError); - return qc; - } - - /** as {@link Tasks#swallowChildrenFailures()} but requiring a {@link TaskQueueingContext}. */ - @Beta - public static void swallowChildrenFailures() { - Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here"); - Tasks.swallowChildrenFailures(); - } - - /** same as {@link Tasks#markInessential()} - * (but included here for convenience as it is often used in conjunction with {@link DynamicTasks}) */ - public static void markInessential() { - Tasks.markInessential(); - } - - /** queues the task if possible, otherwise submits it asynchronously; returns the task for callers to - * {@link Task#getUnchecked()} or {@link Task#blockUntilEnded()} */ - public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) { - return queueIfPossible(task).orSubmitAsync(entity).asTask(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ExecutionListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ExecutionListener.java b/core/src/main/java/brooklyn/util/task/ExecutionListener.java deleted file mode 100644 index 7753588..0000000 --- a/core/src/main/java/brooklyn/util/task/ExecutionListener.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import org.apache.brooklyn.api.management.Task; - -public interface ExecutionListener { - - /** invoked when a task completes: - * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set, - * and {@link Task#get()} should return immediately for most Task implementations - * (care has been taken to avoid potential deadlocks here, waiting for a result!) */ - public void onTaskDone(Task<?> task); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ExecutionUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ExecutionUtils.java b/core/src/main/java/brooklyn/util/task/ExecutionUtils.java deleted file mode 100644 index 37c19d2..0000000 --- a/core/src/main/java/brooklyn/util/task/ExecutionUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import groovy.lang.Closure; - -import java.util.concurrent.Callable; - -import com.google.common.base.Function; -import com.google.common.base.Throwables; - -public class ExecutionUtils { - /** - * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise); - * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable) - * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - public static Object invoke(Object callable, Object ...args) { - if (callable instanceof Closure) return ((Closure<?>)callable).call(args); - if (callable instanceof Callable) { - try { - return ((Callable<?>)callable).call(); - } catch (Throwable t) { - throw Throwables.propagate(t); - } - } - if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; } - if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); } - if (callable==null) return null; - throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args"); - } -}
