http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ValueResolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ValueResolver.java b/core/src/main/java/brooklyn/util/task/ValueResolver.java deleted file mode 100644 index 37f4269..0000000 --- a/core/src/main/java/brooklyn/util/task/ValueResolver.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.flags.TypeCoercions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.javalang.JavaClassNames; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.time.CountdownTimer; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Durations; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - -/** - * Resolves a given object, as follows: - * <li> If it is a {@link Tasks} or a {@link DeferredSupplier} then get its contents - * <li> If it's a map and {@link #deep(boolean)} is requested, it applies resolution to contents - * <li> It applies coercion - * <p> - * Fluent-style API exposes a number of other options. - */ -public class ValueResolver<T> implements DeferredSupplier<T> { - - /** - * Period to wait if we're expected to return real quick - * but we want fast things to have time to finish. - * <p> - * Timings are always somewhat arbitrary but this at least - * allows some intention to be captured in code rather than arbitrary values. */ - public static Duration REAL_QUICK_WAIT = Duration.millis(50); - /** - * Period to wait if we're expected to return quickly - * but we want to be a bit more generous for things to finish, - * without letting a caller get annoyed. - * <p> - * See {@link #REAL_QUICK_WAIT}. */ - public static Duration PRETTY_QUICK_WAIT = Duration.millis(200); - - /** Period to wait when we have to poll but want to give the illusion of no wait. - * See {@link Repeater#DEFAULT_REAL_QUICK_PERIOD} */ - public static Duration REAL_QUICK_PERIOD = Repeater.DEFAULT_REAL_QUICK_PERIOD; - - private static final Logger log = LoggerFactory.getLogger(ValueResolver.class); - - final Object value; - final Class<T> type; - ExecutionContext exec; - String description; - boolean forceDeep; - /** null means do it if you can; true means always, false means never */ - Boolean embedResolutionInTask; - /** timeout on execution, if possible, or if embedResolutionInTask is true */ - Duration timeout; - boolean isTransientTask = true; - - T defaultValue = null; - boolean returnDefaultOnGet = false; - boolean swallowExceptions = false; - - // internal fields - final Object parentOriginalValue; - final CountdownTimer parentTimer; - AtomicBoolean started = new AtomicBoolean(false); - boolean expired; - - ValueResolver(Object v, Class<T> type) { - this.value = v; - this.type = type; - checkTypeNotNull(); - parentOriginalValue = null; - parentTimer = null; - } - - ValueResolver(Object v, Class<T> type, ValueResolver<?> parent) { - this.value = v; - this.type = type; - checkTypeNotNull(); - - exec = parent.exec; - description = parent.description; - forceDeep = parent.forceDeep; - embedResolutionInTask = parent.embedResolutionInTask; - - parentOriginalValue = parent.getOriginalValue(); - - timeout = parent.timeout; - parentTimer = parent.parentTimer; - if (parentTimer!=null && parentTimer.isExpired()) - expired = true; - - // default value and swallow exceptions do not need to be nested - } - - public static class ResolverBuilderPretype { - final Object v; - public ResolverBuilderPretype(Object v) { - this.v = v; - } - public <T> ValueResolver<T> as(Class<T> type) { - return new ValueResolver<T>(v, type); - } - } - - /** returns a copy of this resolver which can be queried, even if the original (single-use instance) has already been copied */ - public ValueResolver<T> clone() { - ValueResolver<T> result = new ValueResolver<T>(value, type) - .context(exec).description(description) - .embedResolutionInTask(embedResolutionInTask) - .deep(forceDeep) - .timeout(timeout); - if (returnDefaultOnGet) result.defaultValue(defaultValue); - if (swallowExceptions) result.swallowExceptions(); - return result; - } - - /** execution context to use when resolving; required if resolving unsubmitted tasks or running with a time limit */ - public ValueResolver<T> context(ExecutionContext exec) { - this.exec = exec; - return this; - } - /** as {@link #context(ExecutionContext)} for use from an entity */ - public ValueResolver<T> context(Entity entity) { - return context(entity!=null ? ((EntityInternal)entity).getExecutionContext() : null); - } - - /** sets a message which will be displayed in status reports while it waits (e.g. the name of the config key being looked up) */ - public ValueResolver<T> description(String description) { - this.description = description; - return this; - } - - /** sets a default value which will be returned on a call to {@link #get()} if the task does not complete - * or completes with an error - * <p> - * note that {@link #getMaybe()} returns an absent object even in the presence of - * a default, so that any error can still be accessed */ - public ValueResolver<T> defaultValue(T defaultValue) { - this.defaultValue = defaultValue; - this.returnDefaultOnGet = true; - return this; - } - - /** indicates that no default value should be returned on a call to {@link #get()}, and instead it should throw - * (this is the default; this method is provided to undo a call to {@link #defaultValue(Object)}) */ - public ValueResolver<T> noDefaultValue() { - this.returnDefaultOnGet = false; - this.defaultValue = null; - return this; - } - - /** indicates that exceptions in resolution should not be thrown on a call to {@link #getMaybe()}, - * but rather used as part of the {@link Maybe#get()} if it's absent, - * and swallowed altogether on a call to {@link #get()} in the presence of a {@link #defaultValue(Object)} */ - public ValueResolver<T> swallowExceptions() { - this.swallowExceptions = true; - return this; - } - - /** whether the task should be marked as transient; defaults true */ - public ValueResolver<T> transientTask(boolean isTransientTask) { - this.isTransientTask = isTransientTask; - return this; - } - - public Maybe<T> getDefault() { - if (returnDefaultOnGet) return Maybe.of(defaultValue); - else return Maybe.absent("No default value set"); - } - - /** causes nested structures (maps, lists) to be descended and nested unresolved values resolved */ - public ValueResolver<T> deep(boolean forceDeep) { - this.forceDeep = forceDeep; - return this; - } - - /** if true, forces execution of a deferred supplier to be run in a task; - * if false, it prevents it (meaning time limits may not be applied); - * if null, the default, it runs in a task if a time limit is applied. - * <p> - * running inside a task is required for some {@link DeferredSupplier} - * instances which look up a task {@link ExecutionContext}. */ - public ValueResolver<T> embedResolutionInTask(Boolean embedResolutionInTask) { - this.embedResolutionInTask = embedResolutionInTask; - return this; - } - - /** sets a time limit on executions - * <p> - * used for {@link Task} and {@link DeferredSupplier} instances. - * may require an execution context at runtime. */ - public ValueResolver<T> timeout(Duration timeout) { - this.timeout = timeout; - return this; - } - - protected void checkTypeNotNull() { - if (type==null) - throw new NullPointerException("type must be set to resolve, for '"+value+"'"+(description!=null ? ", "+description : "")); - } - - public T get() { - Maybe<T> m = getMaybe(); - if (m.isPresent()) return m.get(); - if (returnDefaultOnGet) return defaultValue; - return m.get(); - } - - public Maybe<T> getMaybe() { - Maybe<T> result = getMaybeInternal(); - if (log.isTraceEnabled()) { - log.trace(this+" evaluated as "+result); - } - return result; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected Maybe<T> getMaybeInternal() { - if (started.getAndSet(true)) - throw new IllegalStateException("ValueResolver can only be used once"); - - if (expired) return Maybe.absent("Nested resolution of "+getOriginalValue()+" did not complete within "+timeout); - - ExecutionContext exec = this.exec; - if (exec==null) { - // if execution context not specified, take it from the current task if present - exec = BasicExecutionContext.getCurrentExecutionContext(); - } - - CountdownTimer timerU = parentTimer; - if (timerU==null && timeout!=null) - timerU = timeout.countdownTimer(); - final CountdownTimer timer = timerU; - if (timer!=null && !timer.isRunning()) - timer.start(); - - checkTypeNotNull(); - Object v = this.value; - - //if the expected type is a closure or map and that's what we have, we're done (or if it's null); - //but not allowed to return a future or DeferredSupplier as the resolved value - if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v))) - return Maybe.of((T) v); - - try { - //if it's a task or a future, we wait for the task to complete - if (v instanceof TaskAdaptable<?>) { - //if it's a task, we make sure it is submitted - if (!((TaskAdaptable<?>) v).asTask().isSubmitted() ) { - if (exec==null) - return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available"); - exec.submit(((TaskAdaptable<?>) v).asTask()); - } - } - - if (v instanceof Future) { - final Future<?> vfuture = (Future<?>) v; - - //including tasks, above - if (!vfuture.isDone()) { - Callable<Maybe> callable = new Callable<Maybe>() { - public Maybe call() throws Exception { - return Durations.get(vfuture, timer); - } }; - - String description = getDescription(); - Maybe vm = Tasks.withBlockingDetails("Waiting for "+description, callable); - if (vm.isAbsent()) return vm; - v = vm.get(); - - } else { - v = vfuture.get(); - - } - - } else if (v instanceof DeferredSupplier<?>) { - final Object vf = v; - - if ((!Boolean.FALSE.equals(embedResolutionInTask) && (exec!=null || timeout!=null)) || Boolean.TRUE.equals(embedResolutionInTask)) { - if (exec==null) - return Maybe.absent("Embedding in task needed for '"+getDescription()+"' but no execution context available"); - - Callable<Object> callable = new Callable<Object>() { - public Object call() throws Exception { - try { - Tasks.setBlockingDetails("Retrieving "+vf); - return ((DeferredSupplier<?>) vf).get(); - } finally { - Tasks.resetBlockingDetails(); - } - } }; - String description = getDescription(); - TaskBuilder<Object> vb = Tasks.<Object>builder().body(callable).name("Resolving dependent value").description(description); - if (isTransientTask) vb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG); - Task<Object> vt = exec.submit(vb.build()); - // TODO to handle immediate resolution, it would be nice to be able to submit - // so it executes in the current thread, - // or put a marker in the target thread or task while it is running that the task - // should never wait on anything other than another value being resolved - // (though either could recurse infinitely) - Maybe<Object> vm = Durations.get(vt, timer); - vt.cancel(true); - if (vm.isAbsent()) return (Maybe<T>)vm; - v = vm.get(); - - } else { - try { - Tasks.setBlockingDetails("Retrieving (non-task) "+vf); - v = ((DeferredSupplier<?>) vf).get(); - } finally { - Tasks.resetBlockingDetails(); - } - } - - } else if (v instanceof Map) { - //and if a map or list we look inside - Map result = Maps.newLinkedHashMap(); - for (Map.Entry<?,?> entry : ((Map<?,?>)v).entrySet()) { - Maybe<?> kk = new ValueResolver(entry.getKey(), type, this) - .description( (description!=null ? description+", " : "") + "map key "+entry.getKey() ) - .getMaybe(); - if (kk.isAbsent()) return (Maybe<T>)kk; - Maybe<?> vv = new ValueResolver(entry.getValue(), type, this) - .description( (description!=null ? description+", " : "") + "map value for key "+kk.get() ) - .getMaybe(); - if (vv.isAbsent()) return (Maybe<T>)vv; - result.put(kk.get(), vv.get()); - } - return Maybe.of((T) result); - - } else if (v instanceof Set) { - Set result = Sets.newLinkedHashSet(); - int count = 0; - for (Object it : (Set)v) { - Maybe<?> vv = new ValueResolver(it, type, this) - .description( (description!=null ? description+", " : "") + "entry "+count ) - .getMaybe(); - if (vv.isAbsent()) return (Maybe<T>)vv; - result.add(vv.get()); - count++; - } - return Maybe.of((T) result); - - } else if (v instanceof Iterable) { - List result = Lists.newArrayList(); - int count = 0; - for (Object it : (Iterable)v) { - Maybe<?> vv = new ValueResolver(it, type, this) - .description( (description!=null ? description+", " : "") + "entry "+count ) - .getMaybe(); - if (vv.isAbsent()) return (Maybe<T>)vv; - result.add(vv.get()); - count++; - } - return Maybe.of((T) result); - - } else { - return TypeCoercions.tryCoerce(v, TypeToken.of(type)); - } - - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - - IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); - if (swallowExceptions) { - if (log.isDebugEnabled()) - log.debug("Resolution of "+this+" failed, swallowing and returning: "+e); - return Maybe.absent(problem); - } - if (log.isDebugEnabled()) - log.debug("Resolution of "+this+" failed, throwing: "+e); - throw problem; - } - - return new ValueResolver(v, type, this).getMaybe(); - } - - protected String getDescription() { - return description!=null ? description : ""+value; - } - protected Object getOriginalValue() { - if (parentOriginalValue!=null) return parentOriginalValue; - return value; - } - - @Override - public String toString() { - return JavaClassNames.cleanSimpleClassName(this)+"["+JavaClassNames.cleanSimpleClassName(type)+" "+value+"]"; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskFactory.java b/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskFactory.java deleted file mode 100644 index bd9e96e..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import org.apache.brooklyn.api.management.TaskFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.config.ConfigBag; - -// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it -public class SshFetchTaskFactory implements TaskFactory<SshFetchTaskWrapper> { - - private static final Logger log = LoggerFactory.getLogger(SshFetchTaskFactory.class); - - private boolean dirty = false; - - protected SshMachineLocation machine; - protected String remoteFile; - protected final ConfigBag config = ConfigBag.newInstance(); - - /** constructor where machine will be added later */ - public SshFetchTaskFactory(String remoteFile) { - remoteFile(remoteFile); - } - - /** convenience constructor to supply machine immediately */ - public SshFetchTaskFactory(SshMachineLocation machine, String remoteFile) { - machine(machine); - remoteFile(remoteFile); - } - - protected SshFetchTaskFactory self() { return this; } - - protected void markDirty() { - dirty = true; - } - - public SshFetchTaskFactory machine(SshMachineLocation machine) { - markDirty(); - this.machine = machine; - return self(); - } - - public SshMachineLocation getMachine() { - return machine; - } - - public SshFetchTaskFactory remoteFile(String remoteFile) { - this.remoteFile = remoteFile; - return self(); - } - - public ConfigBag getConfig() { - return config; - } - - @Override - public SshFetchTaskWrapper newTask() { - dirty = false; - return new SshFetchTaskWrapper(this); - } - - @Override - protected void finalize() throws Throwable { - // help let people know of API usage error - if (dirty) - log.warn("Task "+this+" was modified but modification was never used"); - super.finalize(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskWrapper.java b/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskWrapper.java deleted file mode 100644 index 9553b4f..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshFetchTaskWrapper.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskWrapper; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.os.Os; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.Tasks; -import brooklyn.util.task.system.ProcessTaskWrapper; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; - -/** - * As {@link ProcessTaskWrapper}, but putting a file on the remote machine - * - * @since 0.6.0 - */ -@Beta -public class SshFetchTaskWrapper implements TaskWrapper<String> { - - private final Task<String> task; - - private final String remoteFile; - private final SshMachineLocation machine; - private File backingFile; - private final ConfigBag config; - - - // package private as only AbstractSshTaskFactory should invoke - SshFetchTaskWrapper(SshFetchTaskFactory factory) { - this.remoteFile = Preconditions.checkNotNull(factory.remoteFile, "remoteFile"); - this.machine = Preconditions.checkNotNull(factory.machine, "machine"); - TaskBuilder<String> tb = TaskBuilder.<String>builder().dynamic(false).name("ssh fetch "+factory.remoteFile); - task = tb.body(new SshFetchJob()).build(); - config = factory.getConfig(); - } - - @Override - public Task<String> asTask() { - return getTask(); - } - - @Override - public Task<String> getTask() { - return task; - } - - public String getRemoteFile() { - return remoteFile; - } - - public SshMachineLocation getMachine() { - return machine; - } - - private class SshFetchJob implements Callable<String> { - @Override - public String call() throws Exception { - int result = -1; - try { - Preconditions.checkNotNull(getMachine(), "machine"); - backingFile = Os.newTempFile("brooklyn-ssh-fetch-", FilenameUtils.getName(remoteFile)); - backingFile.deleteOnExit(); - - result = getMachine().copyFrom(config.getAllConfig(), remoteFile, backingFile.getPath()); - } catch (Exception e) { - throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned threw exception, in "+Tasks.current()+": "+e, e); - } - if (result!=0) { - throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned non-zero exit code "+result+", in "+Tasks.current()); - } - return FileUtils.readFileToString(backingFile); - } - } - - @Override - public String toString() { - return super.toString()+"["+task+"]"; - } - - /** blocks, returns the fetched file as a string, throwing if there was an exception */ - public String get() { - return getTask().getUnchecked(); - } - - /** blocks, returns the fetched file as bytes, throwing if there was an exception */ - public byte[] getBytes() { - block(); - try { - return FileUtils.readFileToByteArray(backingFile); - } catch (IOException e) { - throw Exceptions.propagate(e); - } - } - - /** blocks until the task completes; does not throw */ - public SshFetchTaskWrapper block() { - getTask().blockUntilEnded(); - return this; - } - - /** true iff the ssh job has completed (with or without failure) */ - public boolean isDone() { - return getTask().isDone(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskFactory.java b/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskFactory.java deleted file mode 100644 index e2c5502..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskFactory.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.io.InputStream; -import java.io.Reader; - -import org.apache.brooklyn.api.management.TaskFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.stream.KnownSizeInputStream; -import brooklyn.util.stream.ReaderInputStream; - -import com.google.common.base.Suppliers; - -// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it -public class SshPutTaskFactory extends SshPutTaskStub implements TaskFactory<SshPutTaskWrapper> { - - private static final Logger log = LoggerFactory.getLogger(SshPutTaskFactory.class); - - private boolean dirty = false; - - /** constructor where machine will be added later */ - public SshPutTaskFactory(String remoteFile) { - remoteFile(remoteFile); - } - - /** convenience constructor to supply machine immediately */ - public SshPutTaskFactory(SshMachineLocation machine, String remoteFile) { - machine(machine); - remoteFile(remoteFile); - } - - protected SshPutTaskFactory self() { return this; } - - protected void markDirty() { - dirty = true; - } - - public SshPutTaskFactory machine(SshMachineLocation machine) { - markDirty(); - this.machine = machine; - return self(); - } - - public SshPutTaskFactory remoteFile(String remoteFile) { - this.remoteFile = remoteFile; - return self(); - } - - public SshPutTaskFactory summary(String summary) { - markDirty(); - this.summary = summary; - return self(); - } - - public SshPutTaskFactory contents(String contents) { - markDirty(); - this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents)); - return self(); - } - - public SshPutTaskFactory contents(byte[] contents) { - markDirty(); - this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents)); - return self(); - } - - public SshPutTaskFactory contents(InputStream stream) { - markDirty(); - this.contents = Suppliers.ofInstance(stream); - return self(); - } - - public SshPutTaskFactory contents(Reader reader) { - markDirty(); - this.contents = Suppliers.ofInstance(new ReaderInputStream(reader)); - return self(); - } - - public SshPutTaskFactory allowFailure() { - markDirty(); - allowFailure = true; - return self(); - } - - public SshPutTaskFactory createDirectory() { - markDirty(); - createDirectory = true; - return self(); - } - - public SshPutTaskWrapper newTask() { - dirty = false; - return new SshPutTaskWrapper(this); - } - - @Override - protected void finalize() throws Throwable { - // help let people know of API usage error - if (dirty) - log.warn("Task "+this+" was modified but modification was never used"); - super.finalize(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskStub.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskStub.java b/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskStub.java deleted file mode 100644 index 185e819..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskStub.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.io.InputStream; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.config.ConfigBag; - -import com.google.common.base.Supplier; - -public class SshPutTaskStub { - - protected String remoteFile; - protected SshMachineLocation machine; - protected Supplier<? extends InputStream> contents; - protected String summary; - protected String permissions; - protected boolean allowFailure = false; - protected boolean createDirectory = false; - protected final ConfigBag config = ConfigBag.newInstance(); - - protected SshPutTaskStub() { - } - - protected SshPutTaskStub(SshPutTaskStub constructor) { - this.remoteFile = constructor.remoteFile; - this.machine = constructor.machine; - this.contents = constructor.contents; - this.summary = constructor.summary; - this.allowFailure = constructor.allowFailure; - this.createDirectory = constructor.createDirectory; - this.permissions = constructor.permissions; - this.config.copy(constructor.config); - } - - public String getRemoteFile() { - return remoteFile; - } - - public String getSummary() { - if (summary!=null) return summary; - return "scp put: "+remoteFile; - } - - public SshMachineLocation getMachine() { - return machine; - } - - protected ConfigBag getConfig() { - return config; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskWrapper.java b/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskWrapper.java deleted file mode 100644 index 4f0cd76..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshPutTaskWrapper.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.util.Arrays; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.util.config.ConfigBag; -import brooklyn.util.internal.ssh.SshTool; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.Tasks; -import brooklyn.util.task.system.ProcessTaskWrapper; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; - -/** As {@link ProcessTaskWrapper}, but putting a file on the remote machine */ -@Beta -public class SshPutTaskWrapper extends SshPutTaskStub implements TaskWrapper<Void> { - - private static final Logger log = LoggerFactory.getLogger(SshPutTaskWrapper.class); - - private final Task<Void> task; - - protected Integer exitCodeOfCopy = null; - protected Exception exception = null; - protected boolean successful = false; - - // package private as only AbstractSshTaskFactory should invoke - SshPutTaskWrapper(SshPutTaskFactory constructor) { - super(constructor); - TaskBuilder<Void> tb = TaskBuilder.<Void>builder().dynamic(false).name(getSummary()); - task = tb.body(new SshPutJob()).build(); - } - - @Override - public Task<Void> asTask() { - return getTask(); - } - - @Override - public Task<Void> getTask() { - return task; - } - - // TODO: - // verify - // copyAsRoot - // owner - // lastModificationDate - see {@link #PROP_LAST_MODIFICATION_DATE}; not supported by all SshTool implementations - // lastAccessDate - see {@link #PROP_LAST_ACCESS_DATE}; not supported by all SshTool implementations - - private class SshPutJob implements Callable<Void> { - @Override - public Void call() throws Exception { - try { - Preconditions.checkNotNull(getMachine(), "machine"); - - String remoteFile = getRemoteFile(); - - if (createDirectory) { - String remoteDir = remoteFile; - int exitCodeOfCreate = -1; - try { - int li = remoteDir.lastIndexOf("/"); - if (li>=0) { - remoteDir = remoteDir.substring(0, li+1); - exitCodeOfCreate = getMachine().execCommands("creating directory for "+getSummary(), - Arrays.asList("mkdir -p "+remoteDir)); - } else { - // nothing to create - exitCodeOfCreate = 0; - } - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("SSH put "+getRemoteFile()+" (create dir, in task "+getSummary()+") to "+getMachine()+" threw exception: "+e); - exception = e; - } - if (exception!=null || !((Integer)0).equals(exitCodeOfCreate)) { - if (!allowFailure) { - if (exception != null) { - throw new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" for SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception); - } - if (exitCodeOfCreate!=0) { - exception = new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" SSH put task) ended with exit code "+exitCodeOfCreate+", in "+Tasks.current()); - throw exception; - } - } - // not successful, but allowed - return null; - } - } - - ConfigBag config = ConfigBag.newInstanceCopying(getConfig()); - if (permissions!=null) config.put(SshTool.PROP_PERMISSIONS, permissions); - - exitCodeOfCopy = getMachine().copyTo(config.getAllConfig(), contents.get(), remoteFile); - - if (log.isDebugEnabled()) - log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" completed with exit code "+exitCodeOfCopy); - } catch (Exception e) { - if (log.isDebugEnabled()) - log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" threw exception: "+e); - exception = e; - } - - if (exception!=null || !((Integer)0).equals(exitCodeOfCopy)) { - if (!allowFailure) { - if (exception != null) { - throw new IllegalStateException(getSummary()+" (SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception); - } - if (exitCodeOfCopy!=0) { - exception = new IllegalStateException(getSummary()+" (SSH put task) ended with exit code "+exitCodeOfCopy+", in "+Tasks.current()); - throw exception; - } - } - // not successful, but allowed - return null; - } - - // TODO verify - - successful = (exception==null && ((Integer)0).equals(exitCodeOfCopy)); - return null; - } - } - - @Override - public String toString() { - return super.toString()+"["+task+"]"; - } - - /** blocks, throwing if there was an exception */ - public Void get() { - return getTask().getUnchecked(); - } - - /** returns the exit code from the copy, 0 on success; - * null if it has not completed or threw exception - * (not sure if this is ever a non-zero integer or if it is meaningful) - * <p> - * most callers will want the simpler {@link #isSuccessful()} */ - public Integer getExitCode() { - return exitCodeOfCopy; - } - - /** returns any exception encountered in the operation */ - public Exception getException() { - return exception; - } - - /** blocks until the task completes; does not throw */ - public SshPutTaskWrapper block() { - getTask().blockUntilEnded(); - return this; - } - - /** true iff the ssh job has completed (with or without failure) */ - public boolean isDone() { - return getTask().isDone(); - } - - /** true iff the scp has completed successfully; guaranteed to be set before {@link #isDone()} or {@link #block()} are satisfied */ - public boolean isSuccessful() { - return successful; - } - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/SshTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/SshTasks.java b/core/src/main/java/brooklyn/util/task/ssh/SshTasks.java deleted file mode 100644 index 9f6fbb9..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/SshTasks.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.util.Map; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.management.ManagementContext; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.apache.brooklyn.api.management.TaskFactory; -import org.apache.brooklyn.api.management.TaskQueueingContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.config.ConfigUtils; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.ConfigKeys; - -import org.apache.brooklyn.location.basic.AbstractLocation; -import org.apache.brooklyn.location.basic.LocationInternal; -import org.apache.brooklyn.location.basic.SshMachineLocation; - -import brooklyn.util.ResourceUtils; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.internal.ssh.SshTool; -import brooklyn.util.net.Urls; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.stream.Streams; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.Tasks; -import brooklyn.util.task.ssh.internal.PlainSshExecTaskFactory; -import brooklyn.util.task.system.ProcessTaskFactory; -import brooklyn.util.task.system.ProcessTaskWrapper; -import brooklyn.util.text.Identifiers; -import brooklyn.util.text.Strings; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - -/** - * Conveniences for generating {@link Task} instances to perform SSH activities on an {@link SshMachineLocation}. - * <p> - * To infer the {@link SshMachineLocation} and take properties from entities and global management context the - * {@link SshEffectorTasks} should be preferred over this class. - * - * @see SshEffectorTasks - * @since 0.6.0 - */ -@Beta -public class SshTasks { - - private static final Logger log = LoggerFactory.getLogger(SshTasks.class); - - public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, String ...commands) { - return newSshExecTaskFactory(machine, true, commands); - } - - public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String ...commands) { - return new PlainSshExecTaskFactory<Integer>(machine, commands) { - { - if (useMachineConfig) - config.putIfAbsent(getSshFlags(machine)); - } - }; - } - - public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, String remoteFile) { - return newSshPutTaskFactory(machine, true, remoteFile); - } - - public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) { - return new SshPutTaskFactory(machine, remoteFile) { - { - if (useMachineConfig) - config.putIfAbsent(getSshFlags(machine)); - } - }; - } - - public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, String remoteFile) { - return newSshFetchTaskFactory(machine, true, remoteFile); - } - - public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) { - return new SshFetchTaskFactory(machine, remoteFile) { - { - if (useMachineConfig) - config.putIfAbsent(getSshFlags(machine)); - } - }; - } - - private static Map<String, Object> getSshFlags(Location location) { - ConfigBag allConfig = ConfigBag.newInstance(); - - if (location instanceof AbstractLocation) { - ManagementContext mgmt = ((AbstractLocation)location).getManagementContext(); - if (mgmt!=null) - allConfig.putAll(mgmt.getConfig().getAllConfig()); - } - - allConfig.putAll(((LocationInternal)location).config().getBag()); - - Map<String, Object> result = Maps.newLinkedHashMap(); - for (String keyS : allConfig.getAllConfig().keySet()) { - ConfigKey<?> key = ConfigKeys.newConfigKey(Object.class, keyS); - if (key.getName().startsWith(SshTool.BROOKLYN_CONFIG_KEY_PREFIX)) { - result.put(ConfigUtils.unprefixedKey(SshTool.BROOKLYN_CONFIG_KEY_PREFIX, key).getName(), allConfig.get(key)); - } - } - return result; - } - - @Beta - public static enum OnFailingTask { - FAIL, - /** issues a warning, sometimes implemented as marking the task inessential and failing it if it appears - * we are in a dynamic {@link TaskQueueingContext}; - * useful because this way the warning appears to the user; - * but note that the check is done against the calling thread so use with some care - * (and thus this enum is currently here rather then elsewhere) */ - WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL, - /** issues a warning in the log if the task fails, otherwise swallows it */ - WARN_IN_LOG_ONLY, - /** not even a warning if the task fails (the caller is expected to handle it as appropriate) */ - IGNORE } - - public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, final boolean failIfCantSudo) { - return dontRequireTtyForSudo(machine, failIfCantSudo ? OnFailingTask.FAIL : OnFailingTask.WARN_IN_LOG_ONLY); - } - /** creates a task which returns modifies sudoers to ensure non-tty access is permitted; - * also gives nice warnings if sudo is not permitted */ - public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, OnFailingTask onFailingTaskRequested) { - final OnFailingTask onFailingTask; - if (onFailingTaskRequested==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { - if (DynamicTasks.getTaskQueuingContext()!=null) - onFailingTask = onFailingTaskRequested; - else - onFailingTask = OnFailingTask.WARN_IN_LOG_ONLY; - } else { - onFailingTask = onFailingTaskRequested; - } - - final String id = Identifiers.makeRandomId(6); - return newSshExecTaskFactory(machine, - BashCommands.dontRequireTtyForSudo(), - // strange quotes are to ensure we don't match against echoed stdin - BashCommands.sudo("echo \"sudo\"-is-working-"+id)) - .summary("setting up sudo") - .configure(SshTool.PROP_ALLOCATE_PTY, true) - .allowingNonZeroExitCode() - .returning(new Function<ProcessTaskWrapper<?>,Boolean>() { public Boolean apply(ProcessTaskWrapper<?> task) { - if (task.getExitCode()==0 && task.getStdout().contains("sudo-is-working-"+id)) return true; - Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - - - if (onFailingTask!=OnFailingTask.IGNORE) { - // TODO if in a queueing context can we mark this task inessential and throw? - // that way user sees the message... - String message = "Error setting up sudo for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+" "+ - " (exit code "+task.getExitCode()+(entity!=null ? ", entity "+entity : "")+")"; - DynamicTasks.queueIfPossible(Tasks.warning(message, null)); - } - Streams.logStreamTail(log, "STDERR of sudo setup problem", Streams.byteArrayOfString(task.getStderr()), 1024); - - if (onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { - Tasks.markInessential(); - } - if (onFailingTask==OnFailingTask.FAIL || onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { - throw new IllegalStateException("Passwordless sudo is required for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+ - (entity!=null ? " ("+entity+")" : "")); - } - return false; - } }); - } - - /** Function for use in {@link ProcessTaskFactory#returning(Function)} which logs all information, optionally requires zero exit code, - * and then returns stdout */ - public static Function<ProcessTaskWrapper<?>, String> returningStdoutLoggingInfo(final Logger logger, final boolean requireZero) { - return new Function<ProcessTaskWrapper<?>, String>() { - public String apply(@Nullable ProcessTaskWrapper<?> input) { - if (logger!=null) logger.info(input+" COMMANDS:\n"+Strings.join(input.getCommands(),"\n")); - if (logger!=null) logger.info(input+" STDOUT:\n"+input.getStdout()); - if (logger!=null) logger.info(input+" STDERR:\n"+input.getStderr()); - if (requireZero && input.getExitCode()!=0) - throw new IllegalStateException("non-zero exit code in "+input.getSummary()+": see log for more details!"); - return input.getStdout(); - } - }; - } - - /** task to install a file given a url, where the url is resolved remotely first then locally */ - public static TaskFactory<?> installFromUrl(final SshMachineLocation location, final String url, final String destPath) { - return installFromUrl(ResourceUtils.create(SshTasks.class), ImmutableMap.<String,Object>of(), location, url, destPath); - } - /** task to install a file given a url, where the url is resolved remotely first then locally */ - public static TaskFactory<?> installFromUrl(final ResourceUtils utils, final Map<String, ?> props, final SshMachineLocation location, final String url, final String destPath) { - return new TaskFactory<TaskAdaptable<?>>() { - @Override - public TaskAdaptable<?> newTask() { - return Tasks.<Void>builder().name("installing "+Urls.getBasename(url)).description("installing "+url+" to "+destPath).body(new Runnable() { - @Override - public void run() { - int result = location.installTo(utils, props, url, destPath); - if (result!=0) - throw new IllegalStateException("Failed to install '"+url+"' to '"+destPath+"' at "+location+": exit code "+result); - } - }).build(); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/internal/AbstractSshExecTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/internal/AbstractSshExecTaskFactory.java b/core/src/main/java/brooklyn/util/task/ssh/internal/AbstractSshExecTaskFactory.java deleted file mode 100644 index 86764f3..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/internal/AbstractSshExecTaskFactory.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh.internal; - -import com.google.common.base.Preconditions; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.task.system.ProcessTaskFactory; -import brooklyn.util.task.system.ProcessTaskWrapper; -import brooklyn.util.task.system.internal.AbstractProcessTaskFactory; - -// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it -public abstract class AbstractSshExecTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends AbstractProcessTaskFactory<T,RET> implements ProcessTaskFactory<RET> { - - /** constructor where machine will be added later */ - public AbstractSshExecTaskFactory(String ...commands) { - super(commands); - } - - /** convenience constructor to supply machine immediately */ - public AbstractSshExecTaskFactory(SshMachineLocation machine, String ...commands) { - this(commands); - machine(machine); - } - - @Override - public ProcessTaskWrapper<RET> newTask() { - dirty = false; - return new ProcessTaskWrapper<RET>(this) { - protected void run(ConfigBag config) { - Preconditions.checkNotNull(getMachine(), "machine"); - if (Boolean.FALSE.equals(this.runAsScript)) { - this.exitCode = getMachine().execCommands(config.getAllConfig(), getSummary(), commands, shellEnvironment); - } else { // runScript = null or TRUE - this.exitCode = getMachine().execScript(config.getAllConfig(), getSummary(), commands, shellEnvironment); - } - } - protected String taskTypeShortName() { return "SSH"; } - }; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/ssh/internal/PlainSshExecTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/ssh/internal/PlainSshExecTaskFactory.java b/core/src/main/java/brooklyn/util/task/ssh/internal/PlainSshExecTaskFactory.java deleted file mode 100644 index efc14db..0000000 --- a/core/src/main/java/brooklyn/util/task/ssh/internal/PlainSshExecTaskFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh.internal; - -import java.util.List; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.task.system.ProcessTaskWrapper; - -import com.google.common.base.Function; - -/** the "Plain" class exists purely so we can massage return types for callers' convenience */ -public class PlainSshExecTaskFactory<RET> extends AbstractSshExecTaskFactory<PlainSshExecTaskFactory<RET>,RET> { - /** constructor where machine will be added later */ - public PlainSshExecTaskFactory(String ...commands) { - super(commands); - } - - /** convenience constructor to supply machine immediately */ - public PlainSshExecTaskFactory(SshMachineLocation machine, String ...commands) { - this(commands); - machine(machine); - } - - /** Constructor where machine will be added later */ - public PlainSshExecTaskFactory(List<String> commands) { - this(commands.toArray(new String[commands.size()])); - } - - /** Convenience constructor to supply machine immediately */ - public PlainSshExecTaskFactory(SshMachineLocation machine, List<String> commands) { - this(machine, commands.toArray(new String[commands.size()])); - } - - @Override - public <T2> PlainSshExecTaskFactory<T2> returning(ScriptReturnType type) { - return (PlainSshExecTaskFactory<T2>) super.<T2>returning(type); - } - - @Override - public <RET2> PlainSshExecTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) { - return (PlainSshExecTaskFactory<RET2>) super.returning(resultTransformation); - } - - @Override - public PlainSshExecTaskFactory<Boolean> returningIsExitCodeZero() { - return (PlainSshExecTaskFactory<Boolean>) super.returningIsExitCodeZero(); - } - - @Override - public PlainSshExecTaskFactory<String> requiringZeroAndReturningStdout() { - return (PlainSshExecTaskFactory<String>) super.requiringZeroAndReturningStdout(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/system/ProcessTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/system/ProcessTaskFactory.java b/core/src/main/java/brooklyn/util/task/system/ProcessTaskFactory.java deleted file mode 100644 index 407111c..0000000 --- a/core/src/main/java/brooklyn/util/task/system/ProcessTaskFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.system; - -import java.util.Map; - -import org.apache.brooklyn.api.management.TaskFactory; - -import brooklyn.config.ConfigKey; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.internal.ssh.SshTool; -import brooklyn.util.task.system.ProcessTaskStub.ScriptReturnType; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; - -public interface ProcessTaskFactory<T> extends TaskFactory<ProcessTaskWrapper<T>> { - public ProcessTaskFactory<T> machine(SshMachineLocation machine); - public ProcessTaskFactory<T> add(String ...commandsToAdd); - public ProcessTaskFactory<T> add(Iterable<String> commandsToAdd); - public ProcessTaskFactory<T> requiringExitCodeZero(); - public ProcessTaskFactory<T> requiringExitCodeZero(String extraErrorMessage); - public ProcessTaskFactory<T> allowingNonZeroExitCode(); - public ProcessTaskFactory<String> requiringZeroAndReturningStdout(); - public ProcessTaskFactory<Boolean> returningIsExitCodeZero(); - public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type); - public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation); - public ProcessTaskFactory<T> runAsCommand(); - public ProcessTaskFactory<T> runAsScript(); - public ProcessTaskFactory<T> runAsRoot(); - public ProcessTaskFactory<T> environmentVariable(String key, String val); - public ProcessTaskFactory<T> environmentVariables(Map<String,String> vars); - public ProcessTaskFactory<T> summary(String summary); - - /** allows setting config-key based properties for specific underlying tools */ - @Beta - public <V> ProcessTaskFactory<T> configure(ConfigKey<V> key, V value); - - /** allows setting config-key/flag based properties for specific underlying tools; - * but note that if any are prefixed with {@link SshTool#BROOKLYN_CONFIG_KEY_PREFIX} - * these should normally be filtered out */ - @Beta - public ProcessTaskFactory<T> configure(Map<?,?> flags); - - /** adds a listener which will be notified of (otherwise) successful completion, - * typically used to invalidate the result (ie throw exception, to promote a string in the output to an exception); - * invoked even if return code is zero, so a better error can be thrown */ - public ProcessTaskFactory<T> addCompletionListener(Function<ProcessTaskWrapper<?>, Void> function); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/system/ProcessTaskStub.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/system/ProcessTaskStub.java b/core/src/main/java/brooklyn/util/task/system/ProcessTaskStub.java deleted file mode 100644 index df37691..0000000 --- a/core/src/main/java/brooklyn/util/task/system/ProcessTaskStub.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.system; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.text.Strings; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class ProcessTaskStub { - - protected final List<String> commands = new ArrayList<String>(); - /** null for localhost */ - protected SshMachineLocation machine; - - // config data - protected String summary; - protected final ConfigBag config = ConfigBag.newInstance(); - - public static enum ScriptReturnType { CUSTOM, EXIT_CODE, STDOUT_STRING, STDOUT_BYTES, STDERR_STRING, STDERR_BYTES } - protected Function<ProcessTaskWrapper<?>, ?> returnResultTransformation = null; - protected ScriptReturnType returnType = ScriptReturnType.EXIT_CODE; - - protected Boolean runAsScript = null; - protected boolean runAsRoot = false; - protected Boolean requireExitCodeZero = null; - protected String extraErrorMessage = null; - protected Map<String,String> shellEnvironment = new MutableMap<String, String>(); - protected final List<Function<ProcessTaskWrapper<?>, Void>> completionListeners = new ArrayList<Function<ProcessTaskWrapper<?>,Void>>(); - - public ProcessTaskStub() {} - - protected ProcessTaskStub(ProcessTaskStub source) { - commands.addAll(source.getCommands()); - machine = source.getMachine(); - summary = source.getSummary(); - config.copy(source.getConfig()); - returnResultTransformation = source.returnResultTransformation; - returnType = source.returnType; - runAsScript = source.runAsScript; - runAsRoot = source.runAsRoot; - requireExitCodeZero = source.requireExitCodeZero; - extraErrorMessage = source.extraErrorMessage; - shellEnvironment.putAll(source.getShellEnvironment()); - completionListeners.addAll(source.getCompletionListeners()); - } - - public String getSummary() { - if (summary!=null) return summary; - return Strings.maxlen(Strings.join(commands, " ; "), 160); - } - - /** null for localhost */ - public SshMachineLocation getMachine() { - return machine; - } - - public Map<String, String> getShellEnvironment() { - return ImmutableMap.copyOf(shellEnvironment); - } - - @Override - public String toString() { - return super.toString()+"["+getSummary()+"]"; - } - - public List<String> getCommands() { - return ImmutableList.copyOf(commands); - } - - public List<Function<ProcessTaskWrapper<?>, Void>> getCompletionListeners() { - return completionListeners; - } - - protected ConfigBag getConfig() { return config; } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java b/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java deleted file mode 100644 index 5c18fdd..0000000 --- a/core/src/main/java/brooklyn/util/task/system/ProcessTaskWrapper.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.system; - -import java.io.ByteArrayOutputStream; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.internal.ssh.ShellTool; -import brooklyn.util.stream.Streams; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.Tasks; -import brooklyn.util.task.system.internal.AbstractProcessTaskFactory; -import brooklyn.util.text.Strings; - -import com.google.common.base.Function; - -/** Wraps a fully constructed process task, and allows callers to inspect status. - * Note that methods in here such as {@link #getStdout()} will return partially completed streams while the task is ongoing - * (and exit code will be null). You can {@link #block()} or {@link #get()} as conveniences on the underlying {@link #getTask()}. */ -public abstract class ProcessTaskWrapper<RET> extends ProcessTaskStub implements TaskWrapper<RET> { - - private static final Logger log = LoggerFactory.getLogger(ProcessTaskWrapper.class); - - private final Task<RET> task; - - // execution details - protected ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - protected ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - protected Integer exitCode = null; - - @SuppressWarnings("unchecked") - protected ProcessTaskWrapper(AbstractProcessTaskFactory<?,RET> constructor) { - super(constructor); - TaskBuilder<Object> tb = constructor.constructCustomizedTaskBuilder(); - if (stdout!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, stdout)); - if (stderr!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, stderr)); - task = (Task<RET>) tb.body(new ProcessTaskInternalJob()).build(); - } - - @Override - public Task<RET> asTask() { - return getTask(); - } - - @Override - public Task<RET> getTask() { - return task; - } - - public Integer getExitCode() { - return exitCode; - } - - public byte[] getStdoutBytes() { - if (stdout==null) return null; - return stdout.toByteArray(); - } - - public byte[] getStderrBytes() { - if (stderr==null) return null; - return stderr.toByteArray(); - } - - public String getStdout() { - if (stdout==null) return null; - return stdout.toString(); - } - - public String getStderr() { - if (stderr==null) return null; - return stderr.toString(); - } - - protected class ProcessTaskInternalJob implements Callable<Object> { - @Override - public Object call() throws Exception { - run( getConfigForRunning() ); - - for (Function<ProcessTaskWrapper<?>, Void> listener: completionListeners) { - try { - listener.apply(ProcessTaskWrapper.this); - } catch (Exception e) { - logWithDetailsAndThrow("Error in "+taskTypeShortName()+" task "+getSummary()+": "+e, e); - } - } - - if (exitCode!=0 && !Boolean.FALSE.equals(requireExitCodeZero)) { - if (Boolean.TRUE.equals(requireExitCodeZero)) { - logWithDetailsAndThrow(taskTypeShortName()+" task ended with exit code "+exitCode+" when 0 was required, in "+Tasks.current()+": "+getSummary(), null); - } else { - // warn, but allow, on non-zero not explicitly allowed - log.warn(taskTypeShortName()+" task ended with exit code "+exitCode+" when non-zero was not explicitly allowed (error may be thrown in future), in " - +Tasks.current()+": "+getSummary()); - } - } - switch (returnType) { - case CUSTOM: return returnResultTransformation.apply(ProcessTaskWrapper.this); - case STDOUT_STRING: return stdout.toString(); - case STDOUT_BYTES: return stdout.toByteArray(); - case STDERR_STRING: return stderr.toString(); - case STDERR_BYTES: return stderr.toByteArray(); - case EXIT_CODE: return exitCode; - } - - throw new IllegalStateException("Unknown return type for "+taskTypeShortName()+" job "+getSummary()+": "+returnType); - } - - protected void logWithDetailsAndThrow(String message, Throwable optionalCause) { - message = (extraErrorMessage!=null ? extraErrorMessage+": " : "") + message; - log.warn(message+" (throwing)"); - logProblemDetails("STDERR", stderr, 1024); - logProblemDetails("STDOUT", stdout, 1024); - logProblemDetails("STDIN", Streams.byteArrayOfString(Strings.join(commands,"\n")), 4096); - if (optionalCause!=null) throw new IllegalStateException(message, optionalCause); - throw new IllegalStateException(message); - } - - protected void logProblemDetails(String streamName, ByteArrayOutputStream stream, int max) { - Streams.logStreamTail(log, streamName+" for problem in "+Tasks.current(), stream, max); - } - - } - - @Override - public String toString() { - return super.toString()+"["+task+"]"; - } - - /** blocks and gets the result, throwing if there was an exception */ - public RET get() { - return getTask().getUnchecked(); - } - - /** blocks until the task completes; does not throw */ - public ProcessTaskWrapper<RET> block() { - getTask().blockUntilEnded(); - return this; - } - - /** true iff the process has completed (with or without failure) */ - public boolean isDone() { - return getTask().isDone(); - } - - /** for overriding */ - protected ConfigBag getConfigForRunning() { - ConfigBag config = ConfigBag.newInstanceCopying(ProcessTaskWrapper.this.config); - if (stdout!=null) config.put(ShellTool.PROP_OUT_STREAM, stdout); - if (stderr!=null) config.put(ShellTool.PROP_ERR_STREAM, stderr); - - if (!config.containsKey(ShellTool.PROP_NO_EXTRA_OUTPUT)) - // by default no extra output (so things like cat, etc work as expected) - config.put(ShellTool.PROP_NO_EXTRA_OUTPUT, true); - - if (runAsRoot) - config.put(ShellTool.PROP_RUN_AS_ROOT, true); - return config; - } - - protected abstract void run(ConfigBag config); - - protected abstract String taskTypeShortName(); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/system/SystemTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/system/SystemTasks.java b/core/src/main/java/brooklyn/util/task/system/SystemTasks.java deleted file mode 100644 index 8553935..0000000 --- a/core/src/main/java/brooklyn/util/task/system/SystemTasks.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.system; - -import brooklyn.util.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory; - -public class SystemTasks { - - public static ProcessTaskFactory<Integer> exec(String ...commands) { - return new ConcreteSystemProcessTaskFactory<Integer>(commands); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/brooklyn/util/task/system/internal/AbstractProcessTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/task/system/internal/AbstractProcessTaskFactory.java b/core/src/main/java/brooklyn/util/task/system/internal/AbstractProcessTaskFactory.java deleted file mode 100644 index e41a9a9..0000000 --- a/core/src/main/java/brooklyn/util/task/system/internal/AbstractProcessTaskFactory.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.system.internal; - -import java.util.Arrays; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynTaskTags; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.stream.Streams; -import brooklyn.util.task.TaskBuilder; -import brooklyn.util.task.system.ProcessTaskFactory; -import brooklyn.util.task.system.ProcessTaskStub; -import brooklyn.util.task.system.ProcessTaskWrapper; -import brooklyn.util.text.Strings; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; - -public abstract class AbstractProcessTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends ProcessTaskStub implements ProcessTaskFactory<RET> { - - private static final Logger log = LoggerFactory.getLogger(AbstractProcessTaskFactory.class); - - protected boolean dirty = false; - - public AbstractProcessTaskFactory(String ...commands) { - this.commands.addAll(Arrays.asList(commands)); - } - - @SuppressWarnings("unchecked") - protected T self() { return (T)this; } - - protected void markDirty() { - dirty = true; - } - - @Override - public T add(String ...commandsToAdd) { - markDirty(); - for (String commandToAdd: commandsToAdd) this.commands.add(commandToAdd); - return self(); - } - - @Override - public T add(Iterable<String> commandsToAdd) { - Iterables.addAll(this.commands, commandsToAdd); - return self(); - } - - @Override - public T machine(SshMachineLocation machine) { - markDirty(); - this.machine = machine; - return self(); - } - - @Override - public T requiringExitCodeZero() { - markDirty(); - requireExitCodeZero = true; - return self(); - } - - @Override - public T requiringExitCodeZero(String extraErrorMessage) { - markDirty(); - requireExitCodeZero = true; - this.extraErrorMessage = extraErrorMessage; - return self(); - } - - @Override - public T allowingNonZeroExitCode() { - markDirty(); - requireExitCodeZero = false; - return self(); - } - - @Override - public ProcessTaskFactory<Boolean> returningIsExitCodeZero() { - if (requireExitCodeZero==null) allowingNonZeroExitCode(); - return returning(new Function<ProcessTaskWrapper<?>,Boolean>() { - public Boolean apply(ProcessTaskWrapper<?> input) { - return input.getExitCode()==0; - } - }); - } - - @Override - public ProcessTaskFactory<String> requiringZeroAndReturningStdout() { - requiringExitCodeZero(); - return this.<String>returning(ScriptReturnType.STDOUT_STRING); - } - - @Override - @SuppressWarnings("unchecked") - public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type) { - markDirty(); - returnType = Preconditions.checkNotNull(type); - return (ProcessTaskFactory<RET2>) self(); - } - - @Override - @SuppressWarnings("unchecked") - public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) { - markDirty(); - returnType = ScriptReturnType.CUSTOM; - this.returnResultTransformation = resultTransformation; - return (ProcessTaskFactory<RET2>) self(); - } - - @Override - public T runAsCommand() { - markDirty(); - runAsScript = false; - return self(); - } - - @Override - public T runAsScript() { - markDirty(); - runAsScript = true; - return self(); - } - - @Override - public T runAsRoot() { - markDirty(); - runAsRoot = true; - return self(); - } - - @Override - public T environmentVariable(String key, String val) { - markDirty(); - shellEnvironment.put(key, val); - return self(); - } - - @Override - public T environmentVariables(Map<String,String> vars) { - if (vars!=null) { - markDirty(); - shellEnvironment.putAll(vars); - } - return self(); - } - - /** creates the TaskBuilder which can be further customized; typically invoked by the initial {@link #newTask()} */ - public TaskBuilder<Object> constructCustomizedTaskBuilder() { - TaskBuilder<Object> tb = TaskBuilder.builder().dynamic(false).name("ssh: "+getSummary()); - - tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDIN, - Streams.byteArrayOfString(Strings.join(commands, "\n")))); - tb.tag(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, shellEnvironment)); - - return tb; - } - - @Override - public T summary(String summary) { - markDirty(); - this.summary = summary; - return self(); - } - - @Override - public <V> T configure(ConfigKey<V> key, V value) { - config.configure(key, value); - return self(); - } - - @Override - public T configure(Map<?, ?> flags) { - if (flags!=null) - config.putAll(flags); - return self(); - } - - @Override - public T addCompletionListener(Function<ProcessTaskWrapper<?>, Void> listener) { - completionListeners.add(listener); - return self(); - } - - @Override - protected void finalize() throws Throwable { - // help let people know of API usage error - if (dirty) - log.warn("Task "+this+" was modified but modification was never used"); - super.finalize(); - } -}
