http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ForwardingTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ForwardingTask.java b/core/src/main/java/brooklyn/util/task/ForwardingTask.java deleted file mode 100644 index 3bc3427..0000000 --- a/core/src/main/java/brooklyn/util/task/ForwardingTask.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.brooklyn.api.management.Task; - -import brooklyn.util.time.Duration; - -import com.google.common.base.Function; -import com.google.common.collect.ForwardingObject; -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ListenableFuture; - -public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> { - - /** Constructor for use by subclasses. */ - protected ForwardingTask() {} - - @Override - protected abstract TaskInternal<T> delegate(); - - @Override - public void addListener(Runnable listener, Executor executor) { - delegate().addListener(listener, executor); - } - - @Override - public boolean cancel(boolean arg0) { - return delegate().cancel(arg0); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return delegate().get(); - } - - @Override - public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { - return delegate().get(arg0, arg1); - } - - @Override - public boolean isCancelled() { - return delegate().isCancelled(); - } - - @Override - public boolean isDone() { - return delegate().isDone(); - } - - @Override - public Task<T> asTask() { - return delegate().asTask(); - } - - @Override - public String getId() { - return delegate().getId(); - } - - @Override - public Set<Object> getTags() { - return delegate().getTags(); - } - - @Override - public long getSubmitTimeUtc() { - return delegate().getSubmitTimeUtc(); - } - - @Override - public long getStartTimeUtc() { - return delegate().getStartTimeUtc(); - } - - @Override - public long getEndTimeUtc() { - return delegate().getEndTimeUtc(); - } - - @Override - public String getDisplayName() { - return delegate().getDisplayName(); - } - - @Override - public String getDescription() { - return delegate().getDescription(); - } - - @Override - public Task<?> getSubmittedByTask() { - return delegate().getSubmittedByTask(); - } - - @Override - public Thread getThread() { - return delegate().getThread(); - } - - @Override - public boolean isSubmitted() { - return delegate().isSubmitted(); - } - - @Override - public boolean isBegun() { - return delegate().isBegun(); - } - - @Override - public boolean isError() { - return delegate().isError(); - } - - @Override - public void blockUntilStarted() { - delegate().blockUntilStarted(); - } - - @Override - public void blockUntilEnded() { - delegate().blockUntilEnded(); - } - - @Override - public boolean blockUntilEnded(Duration timeout) { - return delegate().blockUntilEnded(timeout); - } - - @Override - public String getStatusSummary() { - return delegate().getStatusSummary(); - } - - @Override - public String getStatusDetail(boolean multiline) { - return delegate().getStatusDetail(multiline); - } - - @Override - public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException { - return delegate().get(duration); - } - - @Override - public T getUnchecked() { - return delegate().getUnchecked(); - } - - @Override - public T getUnchecked(Duration duration) { - return delegate().getUnchecked(duration); - } - - @Override - public void initInternalFuture(ListenableFuture<T> result) { - delegate().initInternalFuture(result); - } - - @Override - public long getQueuedTimeUtc() { - return delegate().getQueuedTimeUtc(); - } - - @Override - public Future<T> getInternalFuture() { - return delegate().getInternalFuture(); - } - - @Override - public boolean isQueued() { - return delegate().isQueued(); - } - - @Override - public boolean isQueuedOrSubmitted() { - return delegate().isQueuedOrSubmitted(); - } - - @Override - public boolean isQueuedAndNotSubmitted() { - return delegate().isQueuedAndNotSubmitted(); - } - - @Override - public void markQueued() { - delegate().markQueued(); - } - - @Override - public boolean cancel() { - return delegate().cancel(); - } - - @Override - public boolean blockUntilStarted(Duration timeout) { - return delegate().blockUntilStarted(timeout); - } - - @Override - public String setBlockingDetails(String blockingDetails) { - return delegate().setBlockingDetails(blockingDetails); - } - - @Override - public Task<?> setBlockingTask(Task<?> blockingTask) { - return delegate().setBlockingTask(blockingTask); - } - - @Override - public void resetBlockingDetails() { - delegate().resetBlockingDetails(); - } - - @Override - public void resetBlockingTask() { - delegate().resetBlockingTask(); - } - - @Override - public String getBlockingDetails() { - return delegate().getBlockingDetails(); - } - - @Override - public Task<?> getBlockingTask() { - return delegate().getBlockingTask(); - } - - @Override - public void setExtraStatusText(Object extraStatus) { - delegate().setExtraStatusText(extraStatus); - } - - @Override - public Object getExtraStatusText() { - return delegate().getExtraStatusText(); - } - - @Override - public void runListeners() { - delegate().runListeners(); - } - - @Override - public void setEndTimeUtc(long val) { - delegate().setEndTimeUtc(val); - } - - @Override - public void setThread(Thread thread) { - delegate().setThread(thread); - } - - @Override - public Callable<T> getJob() { - return delegate().getJob(); - } - - @Override - public void setJob(Callable<T> job) { - delegate().setJob(job); - } - - @Override - public ExecutionList getListeners() { - return delegate().getListeners(); - } - - @Override - public void setSubmitTimeUtc(long currentTimeMillis) { - delegate().setSubmitTimeUtc(currentTimeMillis); - } - - @Override - public void setSubmittedByTask(Task<?> task) { - delegate().setSubmittedByTask(task); - } - - @Override - public Set<Object> getMutableTags() { - return delegate().getMutableTags(); - } - - @Override - public void setStartTimeUtc(long currentTimeMillis) { - delegate().setStartTimeUtc(currentTimeMillis); - } - - @Override - public void applyTagModifier(Function<Set<Object>, Void> modifier) { - delegate().applyTagModifier(modifier); - } - - @Override - public Task<?> getProxyTarget() { - return delegate().getProxyTarget(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java b/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java deleted file mode 100644 index 8111332..0000000 --- a/core/src/main/java/brooklyn/util/task/ListenableForwardingFuture.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.concurrent.Executor; -import java.util.concurrent.Future; - -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture; -import com.google.common.util.concurrent.ListenableFuture; - -/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to: - * <li> invoke the listeners on job completion (success or error) - * <li> invoke the listeners on cancel */ -public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> { - - final ExecutionList listeners; - - protected ListenableForwardingFuture(Future<T> delegate) { - super(delegate); - this.listeners = new ExecutionList(); - } - - protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) { - super(delegate); - this.listeners = list; - } - - @Override - public void addListener(Runnable listener, Executor executor) { - listeners.add(listener, executor); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ParallelTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ParallelTask.java b/core/src/main/java/brooklyn/util/task/ParallelTask.java deleted file mode 100644 index d6e65ab..0000000 --- a/core/src/main/java/brooklyn/util/task/ParallelTask.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.management.Task; - -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.text.Strings; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -/** - * Runs {@link Task}s in parallel. - * - * No guarantees of order of starting the tasks, but the return value is a - * {@link List} of the return values of supplied tasks in the same - * order they were passed as arguments. - */ -public class ParallelTask<T> extends CompoundTask<T> { - public ParallelTask(Object... tasks) { super(tasks); } - - public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); } - public ParallelTask(Collection<? extends Object> tasks) { super(tasks); } - - public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); } - public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); } - - @Override - protected List<T> runJobs() throws InterruptedException, ExecutionException { - setBlockingDetails("Executing "+ - (children.size()==1 ? "1 child task" : - children.size()+" children tasks in parallel") ); - for (Task<? extends T> task : children) { - submitIfNecessary(task); - } - - List<T> result = Lists.newArrayList(); - List<Exception> exceptions = Lists.newArrayList(); - for (Task<? extends T> task : children) { - T x; - try { - x = task.get(); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - if (TaskTags.isInessential(task)) { - // ignore exception as it's inessential - } else { - exceptions.add(e); - } - x = null; - } - result.add(x); - } - - if (exceptions.isEmpty()) { - return result; - } else { - if (result.size()==1 && exceptions.size()==1) - throw Exceptions.propagate( exceptions.get(0) ); - throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/ScheduledTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ScheduledTask.java b/core/src/main/java/brooklyn/util/task/ScheduledTask.java deleted file mode 100644 index eabff49..0000000 --- a/core/src/main/java/brooklyn/util/task/ScheduledTask.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static brooklyn.util.GroovyJavaMethods.elvis; -import static brooklyn.util.GroovyJavaMethods.truth; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.management.Task; - -import brooklyn.util.collections.MutableMap; -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; -import com.google.common.base.Throwables; - -/** - * A task which runs with a fixed period. - * <p> - * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)}, - * is not precisely defined. - */ -// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, -// reduce external assumptions about internal structure, and clarify "done" semantics -public class ScheduledTask extends BasicTask { - - final Callable<Task<?>> taskFactory; - /** initial delay before running, set as flag in constructor; defaults to 0 */ - protected Duration delay; - /** time to wait between executions, or null if not to repeat (default), set as flag to constructor; - * this may be modified for subsequent submissions by a running task generated by the factory - * using getSubmittedByTask().setPeriod(Duration) */ - protected Duration period = null; - /** optional, set as flag in constructor; defaults to null meaning no limit */ - protected Integer maxIterations = null; - - protected int runCount=0; - protected Task<?> recentRun, nextRun; - - public int getRunCount() { return runCount; } - public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; } - - public ScheduledTask(Callable<Task<?>> taskFactory) { - this(MutableMap.of(), taskFactory); - } - - public ScheduledTask(final Task<?> task) { - this(MutableMap.of(), task); - } - - public ScheduledTask(Map flags, final Task<?> task){ - this(flags, new Callable<Task<?>>(){ - @Override - public Task<?> call() throws Exception { - return task; - }}); - } - - public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) { - super(flags); - this.taskFactory = taskFactory; - - delay = Duration.of(elvis(flags.remove("delay"), 0)); - period = Duration.of(elvis(flags.remove("period"), null)); - maxIterations = elvis(flags.remove("maxIterations"), null); - } - - public ScheduledTask delay(Duration d) { - this.delay = d; - return this; - } - public ScheduledTask delay(long val) { - return delay(Duration.millis(val)); - } - - public ScheduledTask period(Duration d) { - this.period = d; - return this; - } - public ScheduledTask period(long val) { - return period(Duration.millis(val)); - } - - public ScheduledTask maxIterations(int val) { - this.maxIterations = val; - return this; - } - - public Callable<Task<?>> getTaskFactory() { - return taskFactory; - } - - public Task<?> newTask() { - try { - return taskFactory.call(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - protected String getActiveTaskStatusString(int verbosity) { - StringBuilder rv = new StringBuilder("Scheduler"); - if (runCount>0) rv.append(", iteration "+(runCount+1)); - if (recentRun!=null) rv.append(", last run "+ - Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago"); - if (truth(getNextScheduled())) { - Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS)); - if (untilNext.isPositive()) - rv.append(", next in "+untilNext); - else - rv.append(", next imminent"); - } - return rv.toString(); - } - - @Override - public boolean isDone() { - return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone()); - } - - public synchronized void blockUntilFirstScheduleStarted() { - // TODO Assumes that maxIterations is not negative! - while (true) { - if (isCancelled()) throw new CancellationException(); - if (recentRun==null) - try { - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - Throwables.propagate(e); - } - if (recentRun!=null) return; - } - } - - public void blockUntilEnded() { - while (!isDone()) super.blockUntilEnded(); - } - - /** gets the value of the most recently run task */ - public Object get() throws InterruptedException, ExecutionException { - blockUntilStarted(); - blockUntilFirstScheduleStarted(); - return (truth(recentRun)) ? recentRun.get() : internalFuture.get(); - } - - @Override - public synchronized boolean cancel(boolean mayInterrupt) { - boolean result = super.cancel(mayInterrupt); - if (nextRun!=null) { - nextRun.cancel(mayInterrupt); - notifyAll(); - } - return result; - } - - /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation - * @param duration */ - @Beta - public boolean blockUntilNextRunFinished(Duration timeout) { - return Tasks.blockUntilInternalTasksEnded(nextRun, timeout); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SequentialTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/SequentialTask.java b/core/src/main/java/brooklyn/util/task/SequentialTask.java deleted file mode 100644 index e739eb0..0000000 --- a/core/src/main/java/brooklyn/util/task/SequentialTask.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.management.Task; - -import com.google.common.collect.ImmutableList; - - -/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD; - * (currently is all the return values of individual tasks, but we - * might want some pipeline support and eventually only to return final value...) */ -public class SequentialTask<T> extends CompoundTask<T> { - - public SequentialTask(Object... tasks) { super(tasks); } - - public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); } - public SequentialTask(Collection<? extends Object> tasks) { super(tasks); } - - public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); } - public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); } - - protected List<T> runJobs() throws InterruptedException, ExecutionException { - setBlockingDetails("Executing "+ - (children.size()==1 ? "1 child task" : - children.size()+" children tasks sequentially") ); - - List<T> result = new ArrayList<T>(); - for (Task<? extends T> task : children) { - submitIfNecessary(task); - // throw exception (and cancel subsequent tasks) on error - result.add(task.get()); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java b/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java deleted file mode 100644 index a48bac8..0000000 --- a/core/src/main/java/brooklyn/util/task/SingleThreadedScheduler.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Instances of this class ensures that {@link Task}s execute with in-order - * single-threaded semantics. - * - * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the - * sumbission order. - * <p> - * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em> - * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course) - * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked. - */ -public class SingleThreadedScheduler implements TaskScheduler, CanSetName { - private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class); - - private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>(); - private int queueSize = 0; - private final AtomicBoolean running = new AtomicBoolean(false); - - private ExecutorService executor; - - private String name; - - @Override - public void setName(String name) { - this.name = name; - } - - @Override - public String toString() { - return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString(); - } - - @Override - public void injectExecutor(ExecutorService executor) { - this.executor = executor; - } - - @Override - public synchronized <T> Future<T> submit(Callable<T> c) { - if (running.compareAndSet(false, true)) { - return executeNow(c); - } else { - WrappingFuture<T> f = new WrappingFuture<T>(); - order.add(new QueuedSubmission<T>(c, f)); - queueSize++; - if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) { - LOG.warn("{} is backing up, {} tasks queued", this, queueSize); - if (LOG.isDebugEnabled()) { - LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek()); - } - lastSizeWarn = queueSize; - } - return f; - } - } - int lastSizeWarn = 0; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private synchronized void onEnd() { - boolean done = false; - while (!done) { - if (order.isEmpty()) { - running.set(false); - done = true; - } else { - QueuedSubmission<?> qs = order.remove(); - queueSize--; - if (!qs.f.isCancelled()) { - Future future = executeNow(qs.c); - qs.f.setDelegate(future); - done = true; - } - } - } - } - - private synchronized <T> Future<T> executeNow(final Callable<T> c) { - return executor.submit(new Callable<T>() { - @Override public T call() throws Exception { - try { - return c.call(); - } finally { - onEnd(); - } - }}); - } - - - private static class QueuedSubmission<T> { - final Callable<T> c; - final WrappingFuture<T> f; - - QueuedSubmission(Callable<T> c, WrappingFuture<T> f) { - this.c = c; - this.f = f; - } - - @Override - public String toString() { - return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this)); - } - } - - /** - * A future, where the task may not yet have been submitted to the real executor. - * It delegates to the real future if present, and otherwise waits for that to appear - */ - private static class WrappingFuture<T> implements Future<T> { - private volatile Future<T> delegate; - private boolean cancelled; - - void setDelegate(Future<T> delegate) { - synchronized (this) { - this.delegate = delegate; - notifyAll(); - } - } - - @Override public boolean cancel(boolean mayInterruptIfRunning) { - if (delegate != null) { - return delegate.cancel(mayInterruptIfRunning); - } else { - cancelled = true; - synchronized (this) { - notifyAll(); - } - return true; - } - } - - @Override public boolean isCancelled() { - if (delegate != null) { - return delegate.isCancelled(); - } else { - return cancelled; - } - } - - @Override public boolean isDone() { - return (delegate != null) ? delegate.isDone() : cancelled; - } - - @Override public T get() throws CancellationException, ExecutionException, InterruptedException { - if (cancelled) { - throw new CancellationException(); - } else if (delegate != null) { - return delegate.get(); - } else { - synchronized (this) { - while (delegate == null && !cancelled) { - wait(); - } - } - return get(); - } - } - - @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException { - long endtime = System.currentTimeMillis()+unit.toMillis(timeout); - - if (cancelled) { - throw new CancellationException(); - } else if (delegate != null) { - return delegate.get(timeout, unit); - } else if (System.currentTimeMillis() >= endtime) { - throw new TimeoutException(); - } else { - synchronized (this) { - while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) { - long remaining = endtime - System.currentTimeMillis(); - if (remaining > 0) { - wait(remaining); - } - } - } - long remaining = endtime - System.currentTimeMillis(); - return get(remaining, TimeUnit.MILLISECONDS); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/TaskBuilder.java b/core/src/main/java/brooklyn/util/task/TaskBuilder.java deleted file mode 100644 index ecd4d4f..0000000 --- a/core/src/main/java/brooklyn/util/task/TaskBuilder.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.api.management.TaskFactory; -import org.apache.brooklyn.api.management.TaskQueueingContext; - -import brooklyn.util.JavaGroovyEquivalents; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.collections.MutableSet; - -import com.google.common.collect.Iterables; - -/** Convenience for creating tasks; note that DynamicSequentialTask is the default */ -public class TaskBuilder<T> { - - String name = null; - String description = null; - Callable<T> body = null; - Boolean swallowChildrenFailures = null; - List<TaskAdaptable<?>> children = MutableList.of(); - Set<Object> tags = MutableSet.of(); - Map<String,Object> flags = MutableMap.of(); - Boolean dynamic = null; - boolean parallel = false; - - public static <T> TaskBuilder<T> builder() { - return new TaskBuilder<T>(); - } - - public TaskBuilder<T> name(String name) { - this.name = name; - return this; - } - - public TaskBuilder<T> description(String description) { - this.description = description; - return this; - } - - /** whether task that is built has been explicitly specified to be a dynamic task - * (ie a Task which is also a {@link TaskQueueingContext} - * whereby new tasks can be added after creation */ - public TaskBuilder<T> dynamic(boolean dynamic) { - this.dynamic = dynamic; - return this; - } - - /** whether task that is built should be parallel; cannot (currently) also be dynamic */ - public TaskBuilder<T> parallel(boolean parallel) { - this.parallel = parallel; - return this; - } - - public TaskBuilder<T> body(Callable<T> body) { - this.body = body; - return this; - } - - /** sets up a dynamic task not to fail even if children fail */ - public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) { - this.swallowChildrenFailures = swallowChildrenFailures; - return this; - } - - public TaskBuilder<T> body(Runnable body) { - this.body = JavaGroovyEquivalents.<T>toCallable(body); - return this; - } - - /** adds a child to the given task; the semantics of how the child is executed is set using - * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */ - public TaskBuilder<T> add(TaskAdaptable<?> child) { - children.add(child); - return this; - } - - public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) { - Iterables.addAll(children, additionalChildren); - return this; - } - - public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) { - children.addAll(Arrays.asList(additionalChildren)); - return this; - } - - /** adds a tag to the given task */ - public TaskBuilder<T> tag(Object tag) { - tags.add(tag); - return this; - } - - /** adds a flag to the given task */ - public TaskBuilder<T> flag(String flag, Object value) { - flags.put(flag, value); - return this; - } - - /** adds the given flags to the given task */ - public TaskBuilder<T> flags(Map<String,Object> flags) { - this.flags.putAll(flags); - return this; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Task<T> build() { - MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags); - if (name!=null) taskFlags.put("displayName", name); - if (description!=null) taskFlags.put("description", description); - if (!tags.isEmpty()) taskFlags.put("tags", tags); - - if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) { - if (swallowChildrenFailures!=null) - throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this); - return new BasicTask<T>(taskFlags, body); - } - - // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet) - // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly; - // however dynamic uses an extra thread and task and is noisy for contexts which don't need it - if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) { - if (parallel) - throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available"); - DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body); - if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures(); - for (TaskAdaptable t: children) - result.queue(t.asTask()); - return result; - } - - // T must be of type List<V> for these to be valid - if (body != null) { - throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children"); - } - if (swallowChildrenFailures!=null) { - throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this); - } - - if (parallel) - return new ParallelTask(taskFlags, children); - else - return new SequentialTask(taskFlags, children); - } - - /** returns a a factory based on this builder */ - public TaskFactory<Task<T>> buildFactory() { - return new TaskFactory<Task<T>>() { - public Task<T> newTask() { - return build(); - } - }; - } - - @Override - public String toString() { - return super.toString()+"["+name+"]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskInternal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/TaskInternal.java b/core/src/main/java/brooklyn/util/task/TaskInternal.java deleted file mode 100644 index 51dbddb..0000000 --- a/core/src/main/java/brooklyn/util/task/TaskInternal.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.brooklyn.api.management.ExecutionManager; -import org.apache.brooklyn.api.management.Task; - -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ListenableFuture; - -/** - * All tasks being passed to the {@link ExecutionManager} should implement this. - * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than - * implementing a task from scratch. - * - * The methods on this interface will change in subsequent releases. Because this is - * marked as beta, the normal deprecation policy for these methods does not apply. - * - * @author aled - */ -@Beta -public interface TaskInternal<T> extends Task<T> { - - /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */ - void initInternalFuture(ListenableFuture<T> result); - - /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */ - Future<T> getInternalFuture(); - - /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; - * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ - long getQueuedTimeUtc(); - - boolean isQueuedOrSubmitted(); - boolean isQueuedAndNotSubmitted(); - boolean isQueued(); - - /** marks the task as queued for execution */ - void markQueued(); - - boolean cancel(); - - boolean blockUntilStarted(Duration timeout); - - /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait, - * and typically cleared immediately afterwards; referenced by management api to inspect a task - * which is blocking - * <p> - * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking) - */ - String setBlockingDetails(String blockingDetails); - - /** as {@link #setBlockingDetails(String)} but records a task which is blocking, - * for use e.g. in a gui to navigate to the current active subtask - * <p> - * returns previous blocking task, in case caller wishes to recall and restore it - */ - Task<?> setBlockingTask(Task<?> blockingTask); - - void resetBlockingDetails(); - - void resetBlockingTask(); - - /** returns a textual message giving details while the task is blocked */ - String getBlockingDetails(); - - /** returns a task that this task is blocked on */ - Task<?> getBlockingTask(); - - void setExtraStatusText(Object extraStatus); - - Object getExtraStatusText(); - - void runListeners(); - - void setEndTimeUtc(long val); - - void setThread(Thread thread); - - Callable<T> getJob(); - - void setJob(Callable<T> job); - - ExecutionList getListeners(); - - void setSubmitTimeUtc(long currentTimeMillis); - - void setSubmittedByTask(Task<?> task); - - Set<Object> getMutableTags(); - - void setStartTimeUtc(long currentTimeMillis); - - void applyTagModifier(Function<Set<Object>,Void> modifier); - - /** if a task is a proxy for another one (used mainly for internal tasks), - * this returns the "real" task represented by this one */ - Task<?> getProxyTarget(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/TaskScheduler.java b/core/src/main/java/brooklyn/util/task/TaskScheduler.java deleted file mode 100644 index a10e63a..0000000 --- a/core/src/main/java/brooklyn/util/task/TaskScheduler.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.brooklyn.api.management.Task; - -/** - * The scheduler is an internal mechanism to decorate {@link Task}s. - * - * It can control how the tasks are scheduled for execution (e.g. single-threaded execution, - * prioritised, etc). - */ -public interface TaskScheduler { - - public void injectExecutor(ExecutorService executor); - - /** - * Called by {@link BasicExecutionManager} to schedule tasks. - */ - public <T> Future<T> submit(Callable<T> c); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/TaskTags.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/TaskTags.java b/core/src/main/java/brooklyn/util/task/TaskTags.java deleted file mode 100644 index a9da252..0000000 --- a/core/src/main/java/brooklyn/util/task/TaskTags.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; - -import com.google.common.base.Function; - -public class TaskTags { - - /** marks a task which is allowed to fail without failing his parent */ - public static final String INESSENTIAL_TASK = "inessential"; - - /** marks a task which is a subtask of another */ - public static final String SUB_TASK_TAG = "SUB-TASK"; - - public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) { - ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() { - public Void apply(@Nullable Set<Object> input) { - input.add(tag); - return null; - } - }); - } - - public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) { - ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() { - public Void apply(@Nullable Set<Object> input) { - input.add(tag1); - for (Object tag: tags) input.add(tag); - return null; - } - }); - } - - - public static boolean isInessential(Task<?> task) { - return hasTag(task, INESSENTIAL_TASK); - } - - public static boolean hasTag(Task<?> task, Object tag) { - return task.getTags().contains(tag); - } - - public static <U,V extends TaskAdaptable<U>> V markInessential(V task) { - addTagDynamically(task, INESSENTIAL_TASK); - return task; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/Tasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/Tasks.java b/core/src/main/java/brooklyn/util/task/Tasks.java deleted file mode 100644 index c25dd19..0000000 --- a/core/src/main/java/brooklyn/util/task/Tasks.java +++ /dev/null @@ -1,488 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.api.management.TaskFactory; -import org.apache.brooklyn.api.management.TaskQueueingContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.ReferenceWithError; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.time.CountdownTimer; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; - -public class Tasks { - - private static final Logger log = LoggerFactory.getLogger(Tasks.class); - - /** convenience for setting "blocking details" on any task where the current thread is running; - * typically invoked prior to a wait, for transparency to a user; - * then invoked with 'null' just after the wait */ - public static String setBlockingDetails(String description) { - Task<?> current = current(); - if (current instanceof TaskInternal) - return ((TaskInternal<?>)current).setBlockingDetails(description); - return null; - } - public static void resetBlockingDetails() { - Task<?> current = current(); - if (current instanceof TaskInternal) - ((TaskInternal<?>)current).resetBlockingDetails(); - } - public static Task<?> setBlockingTask(Task<?> blocker) { - Task<?> current = current(); - if (current instanceof TaskInternal) - return ((TaskInternal<?>)current).setBlockingTask(blocker); - return null; - } - public static void resetBlockingTask() { - Task<?> current = current(); - if (current instanceof TaskInternal) - ((TaskInternal<?>)current).resetBlockingTask(); - } - - /** convenience for setting "blocking details" on any task where the current thread is running, - * while the passed code is executed; often used from groovy as - * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre> - * If code block is null, the description is set until further notice (not cleareed). */ - @SuppressWarnings("rawtypes") - public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception { - Task current = current(); - if (code==null) { - log.warn("legacy invocation of withBlockingDetails with null code block, ignoring"); - return null; - } - String prevBlockingDetails = null; - if (current instanceof TaskInternal) { - prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description); - } - try { - return code.call(); - } finally { - if (current instanceof TaskInternal) - ((TaskInternal)current).setBlockingDetails(prevBlockingDetails); - } - } - - /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null; - * if the current task is a proxy, this returns the target of that proxy */ - @SuppressWarnings("rawtypes") - public static Task current() { - return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get()); - } - - public static Task<?> getFinalProxyTarget(Task<?> task) { - if (task==null) return null; - Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget(); - if (proxy==null || proxy.equals(task)) return task; - return getFinalProxyTarget(proxy); - } - - /** creates a {@link ValueResolver} instance which allows significantly more customization than - * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */ - public static <T> ValueResolver<T> resolving(Object v, Class<T> type) { - return new ValueResolver<T>(v, type); - } - - public static ValueResolver.ResolverBuilderPretype resolving(Object v) { - return new ValueResolver.ResolverBuilderPretype(v); - } - - /** @see #resolveValue(Object, Class, ExecutionContext, String) */ - public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException { - return new ValueResolver<T>(v, type).context(exec).get(); - } - - /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary, - * and coercing as allowed by TypeCoercions; - * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up). - * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */ - public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException { - return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get(); - } - - /** - * @see #resolveDeepValue(Object, Class, ExecutionContext, String) - */ - public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException { - return resolveDeepValue(v, type, exec, null); - } - - /** - * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a - * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of - * their values to the given type. For example, the following will return a list containing a map with "1"="true": - * - * {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)} - * - * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element, - * the type should normally be Object, not the type of the collection. This differs from - * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable - * as the required type. - */ - public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException { - return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get(); - } - - /** sets extra status details on the current task, if possible (otherwise does nothing). - * the extra status is presented in Task.getStatusDetails(true) - */ - public static void setExtraStatusDetails(String notes) { - Task<?> current = current(); - if (current instanceof TaskInternal) - ((TaskInternal<?>)current).setExtraStatusText(notes); - } - - public static <T> TaskBuilder<T> builder() { - return TaskBuilder.<T>builder(); - } - - private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) { - Task<?>[] result = new Task<?>[tasks.length]; - for (int i=0; i<tasks.length; i++) - result[i] = tasks[i].asTask(); - return result; - } - - public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) { - return parallelInternal("parallelised tasks", asTasks(tasks)); - } - public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) { - return parallelInternal(name, asTasks(tasks)); - } - public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) { - return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); - } - public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) { - return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); - } - private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) { - return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build(); - } - - public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) { - return sequentialInternal("sequential tasks", asTasks(tasks)); - } - public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) { - return sequentialInternal(name, asTasks(tasks)); - } - public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) { - return sequentialInternal("sequential tasks", taskFactories); - } - public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) { - return sequentialInternal(name, taskFactories); - } - public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) { - return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); - } - public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) { - return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); - } - private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) { - return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build(); - } - private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) { - return new TaskFactory<TaskAdaptable<?>>() { - @Override - public TaskAdaptable<?> newTask() { - TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false); - for (TaskFactory<?> tf: taskFactories) - tb.add(tf.newTask().asTask()); - return tb.build(); - } - }; - } - - /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */ - @SuppressWarnings("unchecked") - public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) { - // support null task to make it easier for callers to walk hierarchies - if (task==null) return null; - for (Object tag: task.getTags()) - if (type.isInstance(tag)) return (T)tag; - if (!recurseHierarchy) return null; - return tag(task.getSubmittedByTask(), type, true); - } - - public static boolean isAncestorCancelled(Task<?> t) { - if (t==null) return false; - if (t.isCancelled()) return true; - return isAncestorCancelled(t.getSubmittedByTask()); - } - - public static boolean isQueued(TaskAdaptable<?> task) { - return ((TaskInternal<?>)task.asTask()).isQueued(); - } - - public static boolean isSubmitted(TaskAdaptable<?> task) { - return ((TaskInternal<?>)task.asTask()).isSubmitted(); - } - - public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) { - return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted(); - } - - /** - * Adds the given task to the given context. Does not throw an exception if the addition fails. - * @return true if the task was added, false otherwise. - */ - public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) { - if (task==null || isQueued(task)) - return false; - try { - adder.queue(task.asTask()); - return true; - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("Could not add task "+task+" at "+adder+": "+e); - return false; - } - } - - /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */ - public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) { - return new Supplier<T>() { - @Override - public T get() { - return task.asTask().getUnchecked(); - } - }; - } - - /** return all children tasks of the given tasks, if it has children, else empty list */ - public static Iterable<Task<?>> children(Task<?> task) { - if (task instanceof HasTaskChildren) - return ((HasTaskChildren)task).getChildren(); - return Collections.emptyList(); - } - - /** returns failed tasks */ - public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) { - return Iterables.filter(subtasks, new Predicate<Task<?>>() { - @Override - public boolean apply(Task<?> input) { - return input.isError(); - } - }); - } - - /** returns the task, its children, and all its children, and so on; - * @param root task whose descendants should be iterated - * @param parentFirst whether to put parents before children or after - */ - public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) { - Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() { - @Override - public Iterable<Task<?>> apply(Task<?> input) { - return descendants(input, parentFirst); - } - })); - if (parentFirst) return Iterables.concat(Collections.singleton(root), descs); - else return Iterables.concat(descs, Collections.singleton(root)); - } - - /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */ - public static Throwable getError(Task<?> t) { - if (t==null) return null; - if (!t.isDone()) return null; - if (t.isCancelled()) return new CancellationException(); - try { - t.get(); - return null; - } catch (Throwable error) { - // do not propagate as we are pretty much guaranteed above that it wasn't this - // thread which originally threw the error - return error; - } - } - public static Task<Void> fail(final String name, final Throwable optionalError) { - return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() { - if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name); - } }).build(); - } - public static Task<Void> warning(final String message, final Throwable optionalError) { - log.warn(message); - return TaskTags.markInessential(fail(message, optionalError)); - } - - /** marks the current task inessential; this mainly matters if the task is running in a parent - * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails - * <p> - * no-op (silently ignored) if not in a task */ - public static void markInessential() { - Task<?> task = Tasks.current(); - if (task==null) { - TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); - if (qc!=null) task = qc.asTask(); - } - if (task!=null) { - TaskTags.markInessential(task); - } - } - - /** causes failures in subtasks of the current task not to fail the parent; - * no-op if not in a {@link TaskQueueingContext}. - * <p> - * essentially like a {@link #markInessential()} on all tasks in the current - * {@link TaskQueueingContext}, including tasks queued subsequently */ - @Beta - public static void swallowChildrenFailures() { - Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here"); - TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); - if (qc!=null) { - qc.swallowChildrenFailures(); - } - } - - /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */ - public static void addTagDynamically(Object tag) { - Task<?> t = Tasks.current(); - if (t!=null) TaskTags.addTagDynamically(t, tag); - } - - /** - * Workaround for limitation described at {@link Task#cancel(boolean)}; - * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation. - * <p> - * It is irritating that {@link FutureTask} sync's object clears the runner thread, - * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done. - * The {@link Task#getEndTimeUtc()} seems the only way. - * - * @return true if tasks ended; false if timed out - **/ - @Beta - public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) { - CountdownTimer timer = timeout.countdownTimer(); - - if (t==null) - return true; - - if (t instanceof ScheduledTask) { - boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining()); - if (!result) return false; - } - - t.blockUntilEnded(timer.getDurationRemaining()); - - while (true) { - if (t.getEndTimeUtc()>=0) return true; - // above should be sufficient; but just in case, trying the below - Thread tt = t.getThread(); - if (t instanceof ScheduledTask) { - ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining()); - return true; - } else { - if (tt==null || !tt.isAlive()) { - if (!t.isCancelled()) { - // may happen for a cancelled task, interrupted after submit but before start - log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")"); - } - return true; - } - } - if (timer.isExpired()) - return false; - Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD); - } - } - - /** returns true if either the current thread or the current task is interrupted/cancelled */ - public static boolean isInterrupted() { - if (Thread.currentThread().isInterrupted()) return true; - Task<?> t = current(); - if (t==null) return false; - return t.isCancelled(); - } - - private static class WaitForRepeaterCallable implements Callable<Boolean> { - protected Repeater repeater; - protected boolean requireTrue; - - public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) { - this.repeater = repeater; - this.requireTrue = requireTrue; - } - - @Override - public Boolean call() { - ReferenceWithError<Boolean> result; - Tasks.setBlockingDetails(repeater.getDescription()); - try { - result = repeater.runKeepingError(); - } finally { - Tasks.resetBlockingDetails(); - } - - if (Boolean.TRUE.equals(result.getWithoutError())) - return true; - if (result.hasError()) - throw Exceptions.propagate(result.getError()); - if (requireTrue) - throw new IllegalStateException("timeout - "+repeater.getDescription()); - return false; - } - } - - /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe, - * returning true or false depending on whether repeater succeed */ - public static TaskBuilder<Boolean> testing(Repeater repeater) { - return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false)) - .name("waiting for condition") - .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription()); - } - - /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe, - * throwing if it does not */ - public static TaskBuilder<?> requiring(Repeater repeater) { - return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true)) - .name("waiting for condition") - .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription()); - } - - private static String getTimeoutString(Repeater repeater) { - Duration timeout = repeater.getTimeLimit(); - if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout)) - return "eventually"; - return "in "+timeout; - } - -}